|
CSP for Java (JCSP) 1.0-rc4 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: INNER | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object | +--jcsp.lang.Barrier
This enables barrier synchronisation between a set of processes.
Shortcut to the Constructor and Method Summaries.
Alternative
.
A process network may contain many barriers - each being associated with a different subset of processes. These subsets may overlap and change at run-time. JCSP does not currently provide a checked way of associating a process with a particular barrier. That could be done, but would carry a syntactic and run-time overhead currently not thought worthwhile. Associating a barrier with its correct set of processes is a design issue and it is left to the designer to ensure correct useage.
Note: this notion of barrier corresponds to the EVENT synchronisation primitive added to the KRoC occam language system.
For fixed barrier sets,
each barrier initialised
to the number of processes to be associated with it and share it out amongst those processes.
For example, here is a fixed set of 10 processes synchronising on a shared barrier:
[===================================================================] barrier | | | ______|______ ______|______ ______|______ | | | | | | | Player(0) | | Player(1) | ... | Player(9) | |_____________| |_____________| |_____________|Here is the JCSP code for this network:
import jcsp.lang.*; public class BarrierExample1 { public static void main (String[] args) { final int nPlayers = 10; final Barrier barrier = new Barrier (nPlayers); final Player[] players = new Player[nPlayers]; for (int i = 0; i < players.length; i++) { players[i] = new Player (i, nPlayers, barrier); } new Parallel (players).run (); } }To synchronise on a barrier, a process just needs to invoke its
sync
method. For example:
import jcsp.lang.*; public class Player implements CSProcess { private final int id, nPlayers; private final Barrier barrier; public Player (int id, int nPlayers, Barrier barrier) { this.id = id; this.nPlayers = nPlayers; this.barrier = barrier; } public void run () { final CSTimer tim = new CSTimer (); final long second = 1000; // JCSP timer units are milliseconds int busy = id + 1; while (true) { tim.sleep (busy*second); // application specific work System.out.println ("Player " + id + " at the barrier ..."); barrier.sync (); System.out.println ("\t\t\t... Player " + id + " over the barrier"); busy = (nPlayers + 1) - busy; // just to make it more interesting } } }The sleep period above represents some work carried out by each Player. This work takes a different amount of time in each cycle and varies from player to player. At the end of each piece of work, each player waits for all its colleagues before continuing its next cycle.
enroll
or resign
from
any barrier it can see. It should not, of course, enroll on a barrier with which
it is already associated - nor resign from a barrier with which it isn't! Because
these operations are internal choices of individual processes and because they have an impact on
the synchronisation properties of their environment, the resulting system is non-deterministic.
______________ | | | TimeKeeper | |______________| | [===================================================================] barrier | | | ______|______ ______|______ ______|______ | | | | | | | Worker(0) | | Worker(1) | ... | Worker(9) | |_____________| |_____________| |_____________|In the above example, Worker processes cycle between working and resting states, making their own decisions about when to switch. When working, they enroll in a barrier shared with a TimeKeeper process - when resting, they resign from this barrier. Whilst working and after they have enrolled, they execute a sequence of work units triggered by synchronisations on the barrier.
The TimeKeeper synchronises on the barrier at a regular rate (once per second) and, thus, coordinates the activities of all working Workers. A work unit can only start at the beginning of one of the TimeKeeper's time slots and each Worker can only perform one work unit per time slot. Should any work unit overrun a time slot, subsequent units (for all Workers) will have a late start. However, the system is stable - so long as there is some slack in the system (i.e. units do not generally overrun), the original schedule will be recovered.
Here is the code for the complete system. The barrier is initialised to just 1, since only the TimeKeeper is permanently associated with it. The barrier is passed to all Workers as well as to the TimeKeeper:
import jcsp.lang.*; public class BarrierExample2 { public static void main (String[] args) { final int nWorkers = 10; final int rogue = 5; final int second = 1000; // JCSP timer units are milliseconds final int tick = 1*second; final int maxWork = tick; // raise this to allow workers to overrun final long seed = new CSTimer ().read (); final Barrier barrier = new Barrier (1); final TimeKeeper timeKeeper = new TimeKeeper (tick, barrier); final Worker[] workers = new Worker[nWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker (i, i + seed, maxWork, i == rogue, barrier); } new Parallel ( new CSProcess[] { timeKeeper, new Parallel (workers) } ).run (); } }As well as the barrier, each Worker is given its id, a (unique) seed for its random number generator, its maximum work unit time and whether it is a rogue. A rogue worker deliberately overruns its last unit of work for each working session to test out the stability of the system:
import jcsp.lang.*; import java.util.*; public class Worker implements CSProcess { private final int id; private final long seed; private final int maxWork; private final boolean rogue; private final Barrier barrier; public Worker (int id, long seed, int maxWork, boolean rogue, Barrier barrier) { this.id = id; this.seed = seed; this.maxWork = maxWork; this.rogue = rogue; this.barrier = barrier; } public void run () { final Random random = new Random (seed); // each process gets a different seed final CSTimer tim = new CSTimer (); final int second = 1000; // JCSP timer units are milliseconds final int minRest = 3*second; final int maxRest = (id + 10)*second; final int nWorkUnits = id + 1; final String starting = "\tWorker " + id + " starting ..."; final String working = "\t\t\t ... Worker " + id + " working ..."; final String resting = "\t\t\t\t\t ... Worker " + id + " resting ..."; while (true) { barrier.enroll (); System.out.println (starting); for (int i = 0; i < nWorkUnits; i++) { barrier.sync (); System.out.println (working); // these lines represent tim.sleep (random.nextInt (maxWork)); // one unit of work } if (rogue) tim.sleep (maxWork); // try to throw the timekeeper barrier.resign (); System.out.println (resting); tim.sleep (minRest + random.nextInt (maxRest)); } } }Note that the
resign
method also performs a (non-blocking) synchronisation
on the barrier as well as the resignation. This is crucial since, if the resigner were
the last process associated with a barrier not to have invoked a sync
,
its resignation must complete the barrier (as though it had invoked a sync)
and release all the remaining associated processes.
The TimeKeeper is passed its tick interval and the Barrier. It is pre-enrolled with the Barrier and remains permanently associated:
import jcsp.lang.*; public class TimeKeeper implements CSProcess { private final long interval; private final Barrier barrier; public TimeKeeper (long interval, Barrier barrier) { this.interval = interval; this.barrier = barrier; } public void run () { final CSTimer tim = new CSTimer (); long timeout = tim.read () + interval; while (true) { tim.after (timeout); barrier.sync (); System.out.println ("[" + (tim.read () - timeout) + "]"); timeout += interval; } } }The print statement from the TimeKeeper gives an upper bound on how far each timeslot strays from its schedule. JCSP
CSTimer
s are currently implemented
on top of standard Java APIs (Thread.sleep and Object.wait).
Depending on the underlying JVM, this should stay close to zero (milliseconds) - except
when the rogue Worker deliberately overruns a work unit.
Other events may also disturb the schedule - e.g. a Ctl-S/Ctl-Q from
the user to pause/resume output or some transient fit of activity from
the operating system. Some JVMs also return early from some timeouts - i.e. the timeslot
starts early, which gives rise to an occasional negative report from the TimeKeeper.
Bear also in mind that the TimeKeeper's print statement has to compete with the print statements from all working Workers. All are scheduled to execute at the start of each timeslot and may be arbitrarilly interleaved. This may be confusing when interpreting the output from the system.
To clarify what's happening, we can arrange for the TimeKeeper's message to be printed first for each timeslot, before any from the Workers. To do this, we need to stall those Workers temporarilly until we know that the TimeKeeper has reported. A simple way to do that is to double up on the barrier synchronisation. For the Worker, modify its working loop:
for (int i = 0; i < nWorkUnits; i++) { barrier.sync (); // wait for everyone barrier.sync (); // wait for the Timekeeper to report System.out.println (working); tim.sleep (random.nextInt (maxWork)); }For the TimeKeeper, modify its run loop:
while (true) { tim.after (timeout); barrier.sync (); // wait for everyone System.out.println ("[" + (tim.read () - timeout) + "]"); barrier.sync (); // let the Workers get going timeout += interval; }
Going back to the original example, the entire barrier synchronisation could be discarded by dropping the TimeKeeper and making each Worker responsible for its own time schedule. However, setting n timeouts (where each setting has O(n) overheads) needs to be compared against setting 1 timeout (by the TimeKeeper) together with a (n+1)-way barrier synchronisation.
For the current implementation, the enroll
and resign
operations - together with most of the sync
s - have unit time
costs. The final sync, which releases all the other (n)
processes blocked on the barrier, takes O(n) time. The unit time costs
for this implementation are comparable with those of a synchronized method
invocation followed by an Object.wait.
[Note: CSP synchronisation primitives can be implemented with much lighter overheads. For example, the KRoC occam equivalent to this Barrier (its EVENT) has (sub-microsecond) unit time costs for all its operations, including the final sync. Future work on JCSP may look towards this standard.]
Bucket
,
One2OneChannel
, Serialized FormConstructor Summary | |
Barrier()
Construct a barrier initially associated with no processes. |
|
Barrier(int nEnrolled)
Construct a barrier (initially) associated with nEnrolled processes. |
Method Summary | |
void |
enroll()
Associate the invoking process with this barrier. |
void |
reset(int nEnrolled)
Reset this barrier to be associated with nEnrolled processes. |
void |
resign()
Disassociate the invoking process from this barrier. |
void |
sync()
Synchronise the invoking process on this barrier. |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
public Barrier()
public Barrier(int nEnrolled)
nEnrolled
- the number of processes (initially) associated with this barrier.Method Detail |
public void reset(int nEnrolled)
nEnrolled
- the number of processes reset to this barrier.public void sync()
public void enroll()
public void resign()
sync
, its resignation completes
the barrier (as though it has invoked a sync) and releases all
the remaining associated processes.
|
CSP for Java (JCSP) 1.0-rc4 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: INNER | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |