//{{{}}} // package occam; //{{{ public class Object_Channel { public class Object_Channel { //{{{ COMMENT documentation // //Channel extends an occam CHAN OF OBJECT for a single reader and multiple //writers. // //It now allows ALTing by that single reader (multiple writers are allowed but //cannot themselves ALT). This implementation follows closely the transputer //implementation -- ALTing is passive (i.e. no busy-waits). // //There is full synchronisation between a reading and writing thread. Any //thread may read or write on this Channel. Multiple writers are queued, //allowing only one at a time to find a reader. Only a single reader is //catered for and this may back off if it selects another Channel. Writers //are not allowed to back off. A reader only completes when it finds a writer. //A writer only completes when it gets to the front of its queue and finds //a reader. // //There is no logical buffering of data in the Channel. // //Note: only the `read' and `write' methods are public. The `enable' and //`disable' Channels are called only from an Alternative object (by the //ALTing process). It is deliberate that `disable' is not synchronised. // //}}} //{{{ local state private Object channel_hold; // buffer (not detectable to users) protected boolean channel_empty = true; // synchronisation flag private Object write_monitor = // all writers multiplex through this new Object (); private Alternative alt; // state of reader //}}} //{{{ public synchronized Object read () throws InterruptedException { public synchronized Object read () throws InterruptedException { if (channel_empty) { channel_empty = false; // first to the rendezvous wait (); // wait for the writer thread notify (); // schedule the writer to finish } else { channel_empty = true; // second to the rendezvous notify (); // schedule the waiting writer thread } return channel_hold; } //}}} //{{{ public void write (Object n) throws InterruptedException { public void write (Object n) throws InterruptedException { synchronized (write_monitor) { synchronized (this) { Alternative tmp_alt = alt; // avoid race-hazard on the volatile alt channel_hold = n; if (tmp_alt != null) { // a reader was ALTing on this Channel channel_empty = false; // first to the rendezvous tmp_alt.schedule (); // tell the reader we are here wait (); // wait for the reader thread } else if (channel_empty) { channel_empty = false; // first to the rendezvous wait (); // wait for the reader thread } else { channel_empty = true; // second to the rendezvous notify (); // schedule the waiting reader thread wait (); // let the reader regain this monitor } } } } //}}} //{{{ synchronized boolean enable (Alternative alt) { synchronized boolean enable (Alternative alt) { if (channel_empty) { this.alt = alt; return false; } else { return true; } } //}}} //{{{ boolean disable () { boolean disable () { alt = null; return !channel_empty; } //}}} } //}}}