Використання семафорiв для сигналiзацiї

Семафори дуже зручно використовувати для передачi сигналiв мiж процесами.

class SendingThread extends Thread { Semaphore semaphore = null; SendingThread( Semaphore semaphore) { this.semaphore = semaphore; System.out.println( "SendingThread created"); } public void run() { System.out.println( "SendingThread started"); for (int i = 0; i < 6; ++i) { System.out.println( "SendingThread do(" + i + ")"); this.semaphore.take( i); } System.out.println( "SendingThread stoped"); } } class ReceivingThread extends Thread { Semaphore semaphore = null; ReceivingThread( Semaphore semaphore) { this.semaphore = semaphore; System.out.println( "ReceivingThread created"); } public void run() { System.out.println( "ReceivingThread started"); while(true) { try { int m = this.semaphore.release(); System.out.println( "ReceivingThread do(" + m + ")"); } catch (InterruptedException ie) { } } } } class Semaphore { private boolean signal = false; private int message = 0; public synchronized void take( int m) { this.signal = true; this.notify(); message = m; System.out.println( "Semaphore take"); } public synchronized int release() throws InterruptedException{ while(!this.signal) { wait(); } this.signal = false; System.out.println( "Semaphore release"); return( message); } } public class SignalingSemaphore { public static void main( String argc[]) { System.out.println( "Main process started"); Semaphore semaphore = new Semaphore(); SendingThread sender = new SendingThread( semaphore); ReceivingThread receiver = new ReceivingThread( semaphore); receiver.start(); sender.start(); System.out.println( "Main process ended"); } }

У цьому прикладi два процеси sender i receiver використовують семафор semaphore для передачi повiдомлень один одному.

Пiсля створення та запуску процес sender послiдовно генерує п'ять повiдомлень та за допомогою методу take() надсилає їх до процеса receiver.

Процес receiver отримує цi повiдомлення за допомогою методу release().

Для коректного отримання повiдомлень процес receiver повинен постiйно перебувати у станi чекання.

Тому його треба запустити першим, та зупинити пiсля отримання усiх повiдомлень за допомогою комбiнацiї клавiш CTRL+C.

 

Рахуючий семафор

Iнодi виникає необхiднiсть у пiдрахунку кiлькостi надiсланих / переданих сигналiв.

Для цого використовують так званi “рахуючi семафори”.

У таких семафорах замiсть булевого буферу використовується цiлочисельний.

class SendingThread extends Thread { CountingSemaphore semaphore = null; SendingThread( CountingSemaphore semaphore) { this.semaphore = semaphore; System.out.println( "SendingThread created"); } public void run() { System.out.println( "SendingThread started"); for (int i = 0; i < 6; ++i) { System.out.println( "SendingThread do(" + i + ")"); this.semaphore.take( i); } System.out.println( "SendingThread stoped"); } } class ReceivingThread extends Thread { CountingSemaphore semaphore = null; ReceivingThread( CountingSemaphore semaphore) { this.semaphore = semaphore; System.out.println( "ReceivingThread created"); } public void run() { int sum = 0; System.out.println( "ReceivingThread started"); while(true) { try { int m = this.semaphore.release(); sum += m; System.out.println( "ReceivingThread do(" + m + "): " + sum); } catch (InterruptedException ie) { } } } } class CountingSemaphore { private int signals = 0; private int message = 0; public synchronized void take( int m) { this.signals++; message = m; this.notify(); System.out.println( "CountingSemaphore(take): " + this.signals); } public synchronized int release() throws InterruptedException { while(this.signals == 0) { wait(); } this.signals--; System.out.println( "CountingSemaphore(release): " + this.signals); return( message); } } public class CountingSemaphoreDemo { public static void main( String argc[]) { System.out.println( "Main process started"); CountingSemaphore semaphore = new CountingSemaphore(); SendingThread sender1 = new SendingThread( semaphore); SendingThread sender2 = new SendingThread( semaphore); ReceivingThread receiver = new ReceivingThread( semaphore); receiver.start(); sender1.start(); sender2.start(); System.out.println( "Main process ended"); } }

Це дозволяє накопичувати iнформацiю про кiлькiсть повiдомлень, що переданi семафором.

 

Обмежуючий семафор

Рахуючий семафор лише пiдраховує кiлькiсть комунiацiйних повiдомлень.

Прицьому вiн не накладає нiяких обмежень на кiлькiсть процесiв, якi обмiнюються повiдомленнями або мають доступ до критичного ресурсу.

Але дуже часто виникає необхiднiсть у обмеженнi кiлькостi комунiкацiй мiж процесами, або мiж процесом i критичним ресурсом.

Це можна здiйснити за допомогою так званого “обмежуючого семафору”.

class SendingThread extends Thread { BoundedSemaphore semaphore = null; String name; long procNum; SendingThread( String n, long p, BoundedSemaphore semaphore) { this.semaphore = semaphore; name = n; procNum = p; System.out.println( name + ": created"); } public void run() { System.out.println( name + ": started"); for (int i = 0; i < 4; ++i) { System.out.println( name + ": do(" + (i * procNum) + ")"); try { this.semaphore.take( i * procNum); } catch (InterruptedException ie) { } } System.out.println( name + ": stoped"); } } class ReceivingThread extends Thread { BoundedSemaphore semaphore = null; String name; int procNum; long sum = 0; ReceivingThread( String n, int p, BoundedSemaphore semaphore) { this.semaphore = semaphore; name = n; procNum = p; System.out.println( name + ": created"); } public void run() { System.out.println( name + ": started"); while(true) { try { long m = this.semaphore.release(); sum += m; System.out.println( name + ": do(" + m + "), " + sum); } catch (InterruptedException ie) { } } } } class BoundedSemaphore { private int signals = 0; private int bound = 0; private long message = 0; public BoundedSemaphore( int upperBound) { this.bound = upperBound; } public synchronized void take( long m) throws InterruptedException { while(this.signals == bound) { wait(); } this.signals++; System.out.println( "BoundedSemaphore(take): " + this.signals); message = m; this.notify(); } public synchronized long release() throws InterruptedException { while(this.signals == 0) { wait(); } this.signals--; System.out.println( "BoundedSemaphore(release): " + this.signals); long m = message; message = 0; this.notify(); return( m); } } public class BoundedSemaphoreDemo { public static void main( String argc[]) { System.out.println( "Main process started"); BoundedSemaphore semaphore = new BoundedSemaphore( 3); SendingThread sender1 = new SendingThread( "Sender 1", 1, semaphore); SendingThread sender2 = new SendingThread( "Sender 2", 10, semaphore); SendingThread sender3 = new SendingThread( "Sender 3", 100, semaphore); SendingThread sender4 = new SendingThread( "Sender 4", 1000, semaphore); SendingThread sender5 = new SendingThread( "Sender 5", 10000, semaphore); SendingThread sender6 = new SendingThread( "Sender 6", 100000, semaphore); ReceivingThread receiver = new ReceivingThread( "Receiver", 1, semaphore); receiver.start(); sender1.start(); sender2.start(); sender3.start(); sender4.start(); sender5.start(); sender6.start(); System.out.println( "Main process ended"); } }

До складу атрибутiв такого семафору входить не тiльки рахiвник сигналiв signals, але й цiлочисельний атрибут bound.

Цей атрибут дозволяє встановити верхню межу кiлькостi процесiв, якi одночасно можуть надсилати повiдомлення, або мати доступ до критичного ресурсу.

 

Завдання

  1. Розглянути, вiдкомпiлювати та запустити на виконання наведенi приклади.
  2. З'ясувати принципи багатопоточної синхронiзацiї за допомогою семафорiв.
  3. З'ясувати особливостi програмування та застосування сигнальних, рахуючих та обмежуючих семафорiв.

 


 

Лабораторна робота №7.