|
CSP for Java (JCSP) 1.1-rc4 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.jcsp.lang.Barrier
public class Barrier
This enables barrier synchronisation between a set of processes.
Shortcut to the Constructor and Method Summaries.
Alternative
.
Applications needing that capability should use an AltingBarrier
.
A process network may contain many barriers - each being associated with a different subset of processes. These subsets may overlap and change at run-time. JCSP does not currently provide a checked way of associating a process with a particular barrier. That could be done, but would carry a syntactic and run-time overhead currently not thought worthwhile. Associating a barrier with its correct set of processes is a design issue and it is left to the designer to ensure correct useage.
Note: this notion of barrier corresponds to the BARRIER synchronisation primitive added to the KRoC occam-pi language system.
For fixed barrier sets,
each barrier initialised
to the number of processes to be associated with it and share it out amongst those processes.
For example, here is a fixed set of 10 processes synchronising on a shared barrier:
import org.jcsp.lang.*; public class BarrierExample1 { public static void main (String[] args) { final int nPlayers = 10; final Barrier barrier = new Barrier (nPlayers); final Player[] players = new Player[nPlayers]; for (int i = 0; i < players.length; i++) { players[i] = new Player (i, nPlayers, barrier); } new Parallel (players).run (); } }To synchronise on a barrier, a process just needs to invoke its
sync
method. For example:
import org.jcsp.lang.*; public class Player implements CSProcess { private final int id, nPlayers; private final Barrier barrier; public Player (int id, int nPlayers, Barrier barrier) { this.id = id; this.nPlayers = nPlayers; this.barrier = barrier; } public void run () { final CSTimer tim = new CSTimer (); final long second = 1000; // JCSP timer units are milliseconds int busy = id + 1; while (true) { tim.sleep (busy*second); // application specific work System.out.println ("Player " + id + " at the barrier ..."); barrier.sync (); System.out.println ("\t\t\t... Player " + id + " over the barrier"); busy = (nPlayers + 1) - busy; // just to make it more interesting } } }The sleep period above represents some work carried out by each Player. This work takes a different amount of time in each cycle and varies from player to player. At the end of each piece of work, each player waits for all its colleagues before continuing its next cycle.
enroll
or resign
from
any barrier it can see. It should not, of course, enroll on a barrier with which
it is already associated - nor resign from a barrier with which it isn't! Because
these operations are internal choices of individual processes and because they have an impact on
the synchronisation properties of their environment, the resulting system is non-deterministic.
The TimeKeeper synchronises on the barrier at a regular rate (once per second) and, thus, coordinates the activities of all working Workers. A work unit can only start at the beginning of one of the TimeKeeper's time slots and each Worker can only perform one work unit per time slot. Should any work unit overrun a time slot, subsequent units (for all Workers) will have a late start. However, the system is stable - so long as there is some slack in the system (i.e. units do not generally overrun), the original schedule will be recovered.
Here is the code for the complete system. The barrier is initialised to just 1, since only the TimeKeeper is permanently associated with it. The barrier is passed to all Workers as well as to the TimeKeeper:
import org.jcsp.lang.*; public class BarrierExample2 { public static void main (String[] args) { final int nWorkers = 10; final int rogue = 5; final int second = 1000; // JCSP timer units are milliseconds final int tick = 1*second; final int maxWork = tick; // raise this to allow workers to overrun final long seed = new CSTimer ().read (); final Barrier barrier = new Barrier (1); final TimeKeeper timeKeeper = new TimeKeeper (tick, barrier); final Worker[] workers = new Worker[nWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker (i, i + seed, maxWork, i == rogue, barrier); } new Parallel ( new CSProcess[] { timeKeeper, new Parallel (workers) } ).run (); } }As well as the barrier, each Worker is given its id, a (unique) seed for its random number generator, its maximum work unit time and whether it is a rogue. A rogue worker deliberately overruns its last unit of work for each working session to test out the stability of the system:
import org.jcsp.lang.*; import java.util.*; public class Worker implements CSProcess { private final int id; private final long seed; private final int maxWork; private final boolean rogue; private final Barrier barrier; public Worker (int id, long seed, int maxWork, boolean rogue, Barrier barrier) { this.id = id; this.seed = seed; this.maxWork = maxWork; this.rogue = rogue; this.barrier = barrier; } public void run () { final Random random = new Random (seed); // each process gets a different seed final CSTimer tim = new CSTimer (); final int second = 1000; // JCSP timer units are milliseconds final int minRest = 3*second; final int maxRest = (id + 10)*second; final int nWorkUnits = id + 1; final String starting = "\tWorker " + id + " starting ..."; final String working = "\t\t\t ... Worker " + id + " working ..."; final String resting = "\t\t\t\t\t ... Worker " + id + " resting ..."; while (true) { barrier.enroll (); System.out.println (starting); for (int i = 0; i < nWorkUnits; i++) { barrier.sync (); System.out.println (working); tim.sleep (random.nextInt (maxWork)); //these lines represent one unit of work } if (rogue) tim.sleep (maxWork); // try to throw the timekeeper barrier.resign (); System.out.println (resting); tim.sleep (minRest + random.nextInt (maxRest)); } } }Note that the
resign
method also performs a (non-blocking) synchronisation
on the barrier as well as the resignation. This is crucial since, if the resigner were
the last process associated with a barrier not to have invoked a sync
,
its resignation must complete the barrier (as though it had invoked a sync)
and release all the remaining associated processes.
The TimeKeeper is passed its tick interval and the Barrier. It is pre-enrolled with the Barrier and remains permanently associated:
import org.jcsp.lang.*; public class TimeKeeper implements CSProcess { private final long interval; private final Barrier barrier; public TimeKeeper (long interval, Barrier barrier) { this.interval = interval; this.barrier = barrier; } public void run () { final CSTimer tim = new CSTimer (); long timeout = tim.read () + interval; while (true) { tim.after (timeout); barrier.sync (); System.out.println ("[" + (tim.read () - timeout) + "]"); timeout += interval; } } }The print statement from the TimeKeeper gives an upper bound on how far each timeslot strays from its schedule. JCSP
CSTimer
s are currently implemented
on top of standard Java APIs (Thread.sleep and Object.wait).
Depending on the underlying JVM, this should stay close to zero (milliseconds) - except
when the rogue Worker deliberately overruns a work unit.
Other events may also disturb the schedule - e.g. a Ctl-S/Ctl-Q from
the user to pause/resume output or some transient fit of activity from
the operating system. Some JVMs also return early from some timeouts - i.e. the timeslot
starts early, which gives rise to an occasional negative report from the TimeKeeper.
Bear also in mind that the TimeKeeper's print statement has to compete with the print statements from all working Workers. All are scheduled to execute at the start of each timeslot and may be arbitrarilly interleaved. This may be confusing when interpreting the output from the system.
To clarify what's happening, we can arrange for the TimeKeeper's message to be printed first for each timeslot, before any from the Workers. To do this, we need to stall those Workers temporarilly until we know that the TimeKeeper has reported. A simple way to do that is to double up on the barrier synchronisation. For the Worker, modify its working loop:
for (int i = 0; i < nWorkUnits; i++) { barrier.sync (); // wait for everyone barrier.sync (); // wait for the Timekeeper to report System.out.println (working); tim.sleep (random.nextInt (maxWork)); }For the TimeKeeper, modify its run loop:
while (true) { tim.after (timeout); barrier.sync (); // wait for everyone System.out.println ("[" + (tim.read () - timeout) + "]"); barrier.sync (); // let the Workers get going timeout += interval; }
Going back to the original example, the entire barrier synchronisation could be discarded by dropping the TimeKeeper and making each Worker responsible for its own time schedule. However, setting n timeouts (where each setting has O(n) overheads) needs to be compared against setting 1 timeout (by the TimeKeeper) together with a (n+1)-way barrier synchronisation.
For the current implementation, the enroll
and resign
operations - together with most of the sync
s - have unit time
costs. The final sync, which releases all the other (n)
processes blocked on the barrier, takes O(n) time. The unit time costs
for this implementation are comparable with those of a synchronized method
invocation followed by an Object.wait.
[Note: CSP synchronisation primitives can be implemented with much lighter overheads. For example, the KRoC occam equivalent to this Barrier (its EVENT) has (sub-microsecond) unit time costs for all its operations, including the final sync. Future work on JCSP may look towards this standard.]
AltingBarrier
,
Bucket
,
Alternative
,
Serialized FormConstructor Summary | |
---|---|
Barrier()
Construct a barrier initially associated with no processes. |
|
Barrier(int nEnrolled)
Construct a barrier (initially) associated with nEnrolled processes. |
Method Summary | |
---|---|
void |
enroll()
A process may enroll only if it is resigned. |
void |
reset(int nEnrolled)
Reset this barrier to be associated with nEnrolled processes. |
void |
resign()
A process may resign only if it is enrolled. |
void |
sync()
Synchronise the invoking process on this barrier. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public Barrier()
public Barrier(int nEnrolled)
nEnrolled
- the number of processes (initially) associated with this barrier.
IllegalArgumentException
- if nEnrolled < 0.Method Detail |
---|
public void reset(int nEnrolled)
nEnrolled
- the number of processes reset to this barrier.
IllegalArgumentException
- if nEnrolled < 0.public void sync()
public void enroll()
resign
).
Other processes cannot complete the barrier (represented by this front-end)
without participation by the re-enrolled process.
Note: timing re-enrollment on a barrier usually needs some care. If the barrier is being used for synchronising phases of execution between a set of processes, it is crucial that re-enrollment occurs in an appropriate (not arbitrary) phase. If the trigger for re-enrollment comes from another enrolled process, that process should be in such an appropriate phase. The resigned process should re-enroll and, then, acknowledge the trigger. The triggering process should wait for that acknowledgement. If the decision to re-enroll is internal (e.g. following a timeout), a buddy process, enrolled on the barrier, should be asked to provide that trigger when in an appropriate phase. The buddy process, perhaps specially built just for this purpose, polls a service channel for that question when in that phase.
Warning: the rule in the first sentence above is the responsibility of the designer -- it is not checked by implementation. If not honoured, things will go wrong.
public void resign()
enroll
).
Other processes can complete the barrier (represented by this front-end)
without participation by the resigned process.
Unless all processes synchronising on this barrier terminate in the same phase, it is usually appropriate for a terminating process to resign first. Otherwise, its sibling processes will never be able to complete another synchronisation.
Warning: the rules in the first two sentences above are the responsibility of the designer -- they are not checked by implementation. If not honoured, things will go wrong.
BarrierError
- if not enrolled (but this is not always detected).
|
CSP for Java (JCSP) 1.1-rc4 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |