//{{{}}} //{{{ imports import java.io.*; import java.util.*; import java.lang.*; //}}} //{{{ public class BUFFER_OF_INT { public class BUFFER_OF_INT { //{{{ COMMENT specification // //BUFFER_OF_INT implements a blocking FIFO buffer of integers. A fixed size //is defined upon initialisation. There can be any number of concurrent //readers and writers. Readers are blocked when the buffer is empty. Writers //are blocked when the buffer is full. A non-empty buffer will not refuse //a reader. A non-full buffer will not refuse a writer. // //The buffer is also `fair' -- i.e. readers are dealt with in the order of //their arrival ... same for writers. Contention between readers and writers //is dealt with in an arbitrary fashion ... but any unfairness is limited by //the size of the buffer (only a finite number of reads can take place without //a write and vice-versa). // //The buffer is `non-busy' -- i.e. there are no polling loops (e.g. by a reader //waiting for the buffer to become non-empty). Blocking is entirely passive. // //}}} //{{{ COMMENT implementation // //Two local monitors, read_monitor and write_monitor, are declared. // //Readers call the read method and, then, queue up for the read_monitor. When //a reader acquires this, it queues up for the monitor associated with the //BUFFER_OF_INT object itself, where its only competitor may be a single writer //that has acquired its write_monitor. When it acquires the BUFFER_OF_INT //monitor, it may have to wait() because the buffer turned out to be empty. //This wait() releases the BUFFER_OF_INT monitor, allowing a writer in, but does //not release the read_monitor first acquired. This forces the other readers //to wait patiently in line and stops them overtaking (perhaps infinitely often) //the waiting reader. After completing its read, any waiting writer is notified //to continue. // //The writers' story is symmetric to the above. // //It is important that the public read and write methods are not synchronized. //Otherwise, a suspended reader would block all writers and vice-versa! // //}}} //{{{ local state int[] buffer; int max; int size = 0; // INVARIANT: (0 <= size <= max) int hi = 0; // INVARIANT: (0 <= hi < max) int lo = 0; // INVARIANT: (0 <= lo < max) boolean waiting_reader = false; // INVARIANT: waiting_reader ==> (size = 0) boolean waiting_writer = false; // INVARIANT: waiting_writer ==> (size = max) Object read_monitor = // all readers multiplex through this new Object (); Object write_monitor = // all writers multiplex through this new Object (); //}}} //{{{ constructor BUFFER_OF_INT (int max) { this.max = max; buffer = new int[max]; } //}}} //{{{ public int read () { public int read () { synchronized (read_monitor) { synchronized (this) { if (size == 0) { waiting_reader = true; //{{{ wait (); try { wait (); } catch (InterruptedException e) { System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" + " whilst waiting to read from an empty buffer ..."); } //}}} } int tmp = lo; // ASSERT: size > 0 lo = (lo + 1) % max; size--; if (waiting_writer) { // ASSERT: size == (max - 1) waiting_writer = false; notify (); } return buffer[tmp]; } } } //}}} //{{{ public void write (int n) { public void write (int n) { synchronized (write_monitor) { synchronized (this) { if (size == max) { waiting_writer = true; //{{{ wait (); try { wait (); } catch (InterruptedException e) { System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" + " whilst waiting to write to a full buffer ..."); } //}}} } buffer[hi] = n; // ASSERT: size < max hi = (hi + 1) % max; size++; if (waiting_reader) { // ASSERT: size == 1 waiting_reader = false; notify (); } } } } //}}} } //}}}