CSP for Java
(JCSP) 1.0-rc4

jcsp.lang
Class Barrier

java.lang.Object
  |
  +--jcsp.lang.Barrier
All Implemented Interfaces:
Serializable

public class Barrier
extends Object
implements Serializable

This enables barrier synchronisation between a set of processes.

Shortcut to the Constructor and Method Summaries.

Description

A channel is a CSP event in which only two processes (the reader and the writer) synchronise. A barrier is a CSP event in which any number of processes may synchronise. Any process synchronising on a barrier will be blocked until all processes associated with that barrier have synchronised. A process may not back off an attempted synchronisation - i.e. barriers cannot be used as guards in an Alternative.

A process network may contain many barriers - each being associated with a different subset of processes. These subsets may overlap and change at run-time. JCSP does not currently provide a checked way of associating a process with a particular barrier. That could be done, but would carry a syntactic and run-time overhead currently not thought worthwhile. Associating a barrier with its correct set of processes is a design issue and it is left to the designer to ensure correct useage.

Note: this notion of barrier corresponds to the EVENT synchronisation primitive added to the KRoC occam language system.

Deterministic Barriers

If the set of processes associated with a barrier remains fixed, barrier synchronisation introduces no non-determinism. So, a parallel system made up of processes, barriers (with fixed barrier sets) and 1-1 channels (with no ALTing on the channels and an exclusive read/write access discipline for all communicated objects) is deterministic - its semantics are independent of scheduling.

For fixed barrier sets, construct each barrier initialised to the number of processes to be associated with it and share it out amongst those processes.

For example, here is a fixed set of 10 processes synchronising on a shared barrier:

              
 [===================================================================] barrier
           |                   |                           |
     ______|______       ______|______               ______|______
    |             |     |             |             |             |
    |  Player(0)  |     |  Player(1)  |     ...     |  Player(9)  |
    |_____________|     |_____________|             |_____________|
 
Here is the JCSP code for this network:
 import jcsp.lang.*;
 
 public class BarrierExample1 {
 
   public static void main (String[] args) {
 
     final int nPlayers = 10;
 
     final Barrier barrier = new Barrier (nPlayers);
 
     final Player[] players = new Player[nPlayers];
     for (int i = 0; i < players.length; i++) {
       players[i] = new Player (i, nPlayers, barrier);
     }
 
     new Parallel (players).run ();
 
   }
 
 }
 
To synchronise on a barrier, a process just needs to invoke its sync method. For example:
 import jcsp.lang.*;
 
 public class Player implements CSProcess {
 
   private final int id, nPlayers;
   private final Barrier barrier;
 
   public Player (int id, int nPlayers, Barrier barrier) {
     this.id = id;
     this.nPlayers = nPlayers;
     this.barrier = barrier;
   }
 
   public void run () {
     final CSTimer tim = new CSTimer ();
     final long second = 1000;          // JCSP timer units are milliseconds
     int busy = id + 1;
     while (true) {
       tim.sleep (busy*second);         // application specific work
       System.out.println ("Player " + id + " at the barrier ...");
       barrier.sync ();
       System.out.println ("\t\t\t... Player " + id + " over the barrier");
       busy = (nPlayers + 1) - busy;    // just to make it more interesting
     }
   }
 
 }
 
The sleep period above represents some work carried out by each Player. This work takes a different amount of time in each cycle and varies from player to player. At the end of each piece of work, each player waits for all its colleagues before continuing its next cycle.

Non-Deterministic Barriers

A process may choose at any time to enroll or resign from any barrier it can see. It should not, of course, enroll on a barrier with which it is already associated - nor resign from a barrier with which it isn't! Because these operations are internal choices of individual processes and because they have an impact on the synchronisation properties of their environment, the resulting system is non-deterministic.
                            ______________
                           |              |
                           |  TimeKeeper  |
                           |______________|
                                   |
 [===================================================================] barrier
           |                   |                           |
     ______|______       ______|______               ______|______
    |             |     |             |             |             |
    |  Worker(0)  |     |  Worker(1)  |     ...     |  Worker(9)  |
    |_____________|     |_____________|             |_____________|
 
In the above example, Worker processes cycle between working and resting states, making their own decisions about when to switch. When working, they enroll in a barrier shared with a TimeKeeper process - when resting, they resign from this barrier. Whilst working and after they have enrolled, they execute a sequence of work units triggered by synchronisations on the barrier.

The TimeKeeper synchronises on the barrier at a regular rate (once per second) and, thus, coordinates the activities of all working Workers. A work unit can only start at the beginning of one of the TimeKeeper's time slots and each Worker can only perform one work unit per time slot. Should any work unit overrun a time slot, subsequent units (for all Workers) will have a late start. However, the system is stable - so long as there is some slack in the system (i.e. units do not generally overrun), the original schedule will be recovered.

Here is the code for the complete system. The barrier is initialised to just 1, since only the TimeKeeper is permanently associated with it. The barrier is passed to all Workers as well as to the TimeKeeper:

 import jcsp.lang.*;
 
 public class BarrierExample2 {
 
   public static void main (String[] args) {
 
     final int nWorkers = 10;
     final int rogue = 5;
 
     final int second = 1000;           // JCSP timer units are milliseconds
     final int tick = 1*second;
     final int maxWork = tick;          // raise this to allow workers to overrun
 
     final long seed = new CSTimer ().read ();
 
     final Barrier barrier = new Barrier (1);
 
     final TimeKeeper timeKeeper = new TimeKeeper (tick, barrier);
 
     final Worker[] workers = new Worker[nWorkers];
     for (int i = 0; i < workers.length; i++) {
       workers[i] = new Worker (i, i + seed, maxWork, i == rogue, barrier);
     }
 
     new Parallel (
       new CSProcess[] {
         timeKeeper,
         new Parallel (workers)
       }
     ).run ();
 
   }
 
 }
 
As well as the barrier, each Worker is given its id, a (unique) seed for its random number generator, its maximum work unit time and whether it is a rogue. A rogue worker deliberately overruns its last unit of work for each working session to test out the stability of the system:
 import jcsp.lang.*;
 import java.util.*;
 
 public class Worker implements CSProcess {
 
   private final int id;
   private final long seed;
   private final int maxWork;
   private final boolean rogue;
   private final Barrier barrier;
 
   public Worker (int id, long seed, int maxWork,
                  boolean rogue, Barrier barrier) {
     this.id = id;
     this.seed = seed;
     this.maxWork = maxWork;
     this.rogue = rogue;
     this.barrier = barrier;
   }
 
   public void run () {
 
     final Random random = new Random (seed);    // each process gets a different seed
 
     final CSTimer tim = new CSTimer ();
     final int second = 1000;                    // JCSP timer units are milliseconds
 
     final int minRest = 3*second;
     final int maxRest = (id + 10)*second;
     final int nWorkUnits = id + 1;
 
     final String starting = "\tWorker " + id + " starting ...";
     final String  working = "\t\t\t  ... Worker " + id + " working ...";
     final String  resting = "\t\t\t\t\t       ... Worker " + id + " resting ...";
 
     while (true) {
       barrier.enroll ();
       System.out.println (starting);
       for (int i = 0; i < nWorkUnits; i++) {
         barrier.sync ();
         System.out.println (working);           // these lines represent
         tim.sleep (random.nextInt (maxWork));   // one unit of work
       }
       if (rogue) tim.sleep (maxWork);           // try to throw the timekeeper
       barrier.resign ();
       System.out.println (resting);
       tim.sleep (minRest + random.nextInt (maxRest));
     }
   }
 
 }
 
Note that the
resign method also performs a (non-blocking) synchronisation on the barrier as well as the resignation. This is crucial since, if the resigner were the last process associated with a barrier not to have invoked a sync, its resignation must complete the barrier (as though it had invoked a sync) and release all the remaining associated processes.

The TimeKeeper is passed its tick interval and the Barrier. It is pre-enrolled with the Barrier and remains permanently associated:

 import jcsp.lang.*;
 
 public class TimeKeeper implements CSProcess {
 
   private final long interval;
   private final Barrier barrier;
 
   public TimeKeeper (long interval, Barrier barrier) {
     this.interval = interval;
     this.barrier = barrier;
   }
 
   public void run () {
 
     final CSTimer tim = new CSTimer ();
     long timeout = tim.read () + interval;
 
     while (true) {
       tim.after (timeout);
       barrier.sync ();
       System.out.println ("[" + (tim.read () - timeout) + "]");
       timeout += interval;
     }
   }
 
 }
 
The print statement from the TimeKeeper gives an upper bound on how far each timeslot strays from its schedule. JCSP CSTimers are currently implemented on top of standard Java APIs (Thread.sleep and Object.wait). Depending on the underlying JVM, this should stay close to zero (milliseconds) - except when the rogue Worker deliberately overruns a work unit. Other events may also disturb the schedule - e.g. a Ctl-S/Ctl-Q from the user to pause/resume output or some transient fit of activity from the operating system. Some JVMs also return early from some timeouts - i.e. the timeslot starts early, which gives rise to an occasional negative report from the TimeKeeper.

Bear also in mind that the TimeKeeper's print statement has to compete with the print statements from all working Workers. All are scheduled to execute at the start of each timeslot and may be arbitrarilly interleaved. This may be confusing when interpreting the output from the system.

To clarify what's happening, we can arrange for the TimeKeeper's message to be printed first for each timeslot, before any from the Workers. To do this, we need to stall those Workers temporarilly until we know that the TimeKeeper has reported. A simple way to do that is to double up on the barrier synchronisation. For the Worker, modify its working loop:

   for (int i = 0; i < nWorkUnits; i++) {
     barrier.sync ();                     // wait for everyone
     barrier.sync ();                     // wait for the Timekeeper to report
     System.out.println (working);
     tim.sleep (random.nextInt (maxWork));
   }
 
For the TimeKeeper, modify its run loop:
   while (true) {
     tim.after (timeout);
     barrier.sync ();                     // wait for everyone
     System.out.println ("[" + (tim.read () - timeout) + "]");
     barrier.sync ();                     // let the Workers get going
     timeout += interval;
   }
 

Overheads

Free use of additional synchronisations to gain special control (such as in the above) depends on the overheads being not so great as to render that control pointless.

Going back to the original example, the entire barrier synchronisation could be discarded by dropping the TimeKeeper and making each Worker responsible for its own time schedule. However, setting n timeouts (where each setting has O(n) overheads) needs to be compared against setting 1 timeout (by the TimeKeeper) together with a (n+1)-way barrier synchronisation.

For the current implementation, the enroll and resign operations - together with most of the syncs - have unit time costs. The final sync, which releases all the other (n) processes blocked on the barrier, takes O(n) time. The unit time costs for this implementation are comparable with those of a synchronized method invocation followed by an Object.wait.

[Note: CSP synchronisation primitives can be implemented with much lighter overheads. For example, the KRoC occam equivalent to this Barrier (its EVENT) has (sub-microsecond) unit time costs for all its operations, including the final sync. Future work on JCSP may look towards this standard.]

Author:
P.H.Welch
See Also:
Bucket, One2OneChannel, Serialized Form

Constructor Summary
Barrier()
          Construct a barrier initially associated with no processes.
Barrier(int nEnrolled)
          Construct a barrier (initially) associated with nEnrolled processes.
 
Method Summary
 void enroll()
          Associate the invoking process with this barrier.
 void reset(int nEnrolled)
          Reset this barrier to be associated with nEnrolled processes.
 void resign()
          Disassociate the invoking process from this barrier.
 void sync()
          Synchronise the invoking process on this barrier.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

Barrier

public Barrier()
Construct a barrier initially associated with no processes.

Barrier

public Barrier(int nEnrolled)
Construct a barrier (initially) associated with nEnrolled processes.
Parameters:
nEnrolled - the number of processes (initially) associated with this barrier.
Method Detail

reset

public void reset(int nEnrolled)
Reset this barrier to be associated with nEnrolled processes. This must only be done at a time when no processes are active on the barrier.
Parameters:
nEnrolled - the number of processes reset to this barrier.

sync

public void sync()
Synchronise the invoking process on this barrier. Any process synchronising on this barrier will be blocked until all processes associated with the barrier have synchronised (or resigned).

enroll

public void enroll()
Associate the invoking process with this barrier.

resign

public void resign()
Disassociate the invoking process from this barrier. Note that if the resigner is the last process associated with the barrier not to have invoked a sync, its resignation completes the barrier (as though it has invoked a sync) and releases all the remaining associated processes.

CSP for Java
(JCSP) 1.0-rc4

Submit a bug or feature to jcsp-team@ukc.ac.uk
Version 1.0-rc4 of the JCSP API Specification (Copyright 1997-2000 P.D.Austin and P.H.Welch - All Rights Reserved)
Java is a trademark or registered trademark of Sun Microsystems, Inc. in the US and other countries.