CSP for Java
(JCSP) 1.0-rc4

jcsp.lang
Class Bucket

java.lang.Object
  |
  +--jcsp.lang.Bucket
All Implemented Interfaces:
Serializable

public class Bucket
extends Object
implements Serializable

This enables bucket synchronisation between a set of processes.

Shortcut to the Constructor and Method Summaries.

Description

A bucket is a non-deterministic cousin of a Barrier. A bucket is somewhere to fallInto when a process needs somewhere to park itself. There is no limit on the number of processes that can fallInto a bucket - and all are blocked when they do. Release happens when a process (and it will have to be another process) chooses to flush that bucket. When that happens, all processes in the bucket (which may be none) are rescheduled for execution.

Buckets are a non-deterministic primitive, since the decision to flush is a free (internal) choice of the process concerned and the scheduling of that flush impacts on the semantics. Usually, only one process is given responsibility for flushing a bucket (or set of buckets). Flushing a bucket does not block the flusher.

Note: this notion of bucket corresponds to the BUCKET synchronisation primitive added to the KRoC occam language system.

Implementation Note

The fallInto and flush methods of Bucket are just a re-badging of the wait and notifyAll methods of Object - but without the need to gain a monitor lock and without the need to look out for the wait being interrupted.

Currently, though, this is how they are implemented. Beware that a notifyAll carries an O(n) overhead (where n is the number of processes being notified), since each notified process must regain the monitor lock before it can exit the synchronized region. Future JCSP implementations of Bucket will look to follow occam kernels and reduce the overheads of both fallInto and flush to O(1).

A Simple Example

This consists of 10 workers, one bucket and one flusher:
                                _____________
                               |             |
                               |   Flusher   |
                               |_____________|
                                      |
 [===================================================================] bucket
           |                   |                           |
     ______|______       ______|______               ______|______
    |             |     |             |             |             |
    |  Worker(0)  |     |  Worker(1)  |     ...     |  Worker(9)  |
    |_____________|     |_____________|             |_____________|
 
Here is the code for this network:
 import jcsp.lang.*;
 
 public class BucketExample1 {
 
   public static void main (String[] args) {
 
     final int nWorkers = 10;
 
     final int second = 1000;                 // JCSP timer units are milliseconds
     final int interval = 5*second;
     final int maxWork = 10*second;
 
     final long seed = new CSTimer ().read ();  // for the random number generators
 
     final Bucket bucket = new Bucket ();
 
     final Flusher flusher = new Flusher (interval, bucket);
 
     final Worker[] workers = new Worker[nWorkers];
     for (int i = 0; i < workers.length; i++) {
       workers[i] = new Worker (i, i + seed, maxWork, bucket);
     }
 
     System.out.println ("*** Flusher: interval = " + interval + " milliseconds");
 
     new Parallel (
       new CSProcess[] {
         flusher,
         new Parallel (workers)
       }
     ).run ();
 
   }
 
 }
 
A Worker cycle consists of one shift (which takes a random amount of time) and, then, falling into the bucket:
 import jcsp.lang.*;
 import java.util.*;
 
 public class Worker implements CSProcess {
 
   private final int id;
   private final long seed;
   private final int maxWork;
   private final Bucket bucket;
 
   public Worker (int id, long seed, int maxWork, Bucket bucket) {
     this.id = id;
     this.seed = seed;
     this.maxWork = maxWork;
     this.bucket = bucket;
   }
 
   public void run () {
 
     final Random random = new Random (seed);   // each process gets a different seed
 
     final CSTimer tim = new CSTimer ();
     final int second = 1000;                   // JCSP timer units are milliseconds
 
     final String working = "\t... Worker " + id + " working ...";
     final String falling = "\t\t\t     ... Worker " + id + " falling ...";
     final String flushed = "\t\t\t\t\t\t  ... Worker " + id + " flushed ...";
 
     while (true) {
       System.out.println (working);            // these lines represent
       tim.sleep (random.nextInt (maxWork));    // one unit of work
       System.out.println (falling);
       bucket.fallInto ();
       System.out.println (flushed);
     }
   }
 
 }
 
The Flusher just flushes the bucket at preset time intervals:
import jcsp.lang.*;
 import java.util.*;
 
 public class Flusher implements CSProcess {
 
   private final int interval;
   private final Bucket bucket;
 
   public Flusher (int interval, Bucket bucket) {
     this.interval = interval;
     this.bucket = bucket;
   }
 
   public void run () {
 
     final CSTimer tim = new CSTimer ();
     long timeout = tim.read () + interval;
 
     while (true) {
       tim.after (timeout);
       System.out.println ("*** Flusher: about to flush ...");
       final int n = bucket.flush ();
       System.out.println ("*** Flusher: number flushed = " + n);
       timeout += interval;
     }
   }
 
 }
 

The Flying Dingbats

This consists of many buckets, a single bucket keeper (responsible for flushing the buckets) and flock of Dingbats (who regularly fall into various buckets). Here is the system diagram:
                                ________________
                               |                |
                               |  BucketKeeper  |
                               |________________|
                                       |
 [===================================================================] bucket[0]
            |                    |     |                      |
 [===================================================================] bucket[1]
            |                    |     |                      |
            .                    .     .                      .
            .                    .     .                      .
            .                    .     .                      .
            |                    |     |                      |
 [===================================================================] bucket[19]
            |                    |                            |
     _______|______       _______|______               _______|______
    |              |     |              |             |              |
    |  Dingbat(2)  |     |  Dingbat(3)  |     ...     |  Dingbat(10) |
    |______________|     |______________|             |______________|
 
And here is the network code:
 import jcsp.lang.*;
 
 public class BucketExample2 {
 
   public static void main (String[] args) {
 
     final int minDingbat = 2;
     final int maxDingbat = 10;
     final int nDingbats = (maxDingbat - minDingbat) + 1;
 
     final int nBuckets = 2*maxDingbat;
 
     final Bucket[] bucket = Bucket.create (nBuckets);
 
     final int second = 1000;     // JCSP timer units are milliseconds
     final int tick = second;
     final BucketKeeper bucketKeeper = new BucketKeeper (tick, bucket);
 
     final Dingbat[] dingbats = new Dingbat[nDingbats];
     for (int i = 0; i < dingbats.length; i++) {
       dingbats[i] = new Dingbat (i + minDingbat, bucket);
     }
 
     new Parallel (
       new CSProcess[] {
         bucketKeeper,
         new Parallel (dingbats)
       }
     ).run ();
 
   }
 
 }
 
The BucketKeeper keeps time, flushing buckets in sequence at a steady rate. When the last one has been flushed, it starts again with the first:
 import jcsp.lang.*;
 
 class BucketKeeper implements CSProcess {
 
   private final long interval;
   private final Bucket[] bucket;
 
   public BucketKeeper (long interval, Bucket[] bucket) {
     this.interval = interval;
     this.bucket = bucket;
   }
 
   public void run () {
 
     String[] spacer = new String[bucket.length];
     spacer[0] = "";
     for (int i = 1; i < spacer.length; i++) {
       spacer[i] = spacer[i - 1] + "  ";
     }
 
     final CSTimer tim = new CSTimer ();
     long timeout = tim.read ();
     int index = 0;
 
     while (true) {
       final int n = bucket[index].flush ();
       if (n == 0) {
         System.out.println (spacer[index] + "*** bucket " +
                             index + " was empty ...");
       }
       index = (index + 1) % bucket.length;
       timeout += interval;
       tim.after (timeout);
     }
   }
 
 }
 
So the buckets represent time values. A process falling into one of them will sleep until the prescribed time when the BucketKeeper next flushes it.

Dingbats live the following cycle. First, they do some work (rather brief in the following code). Then, they work out which bucket to fall into and fall into it - that is all. In this case, Dingbats just fly on id buckets from whence they were just flushed (where id is the Dingbat number indicated in the above network diagram):

 import jcsp.lang.*;
 
 public class Dingbat implements CSProcess {
 
   private final int id;
   private final Bucket[] bucket;
 
   public Dingbat (int id, Bucket[] bucket) {
     this.id = id;
     this.bucket = bucket;
   }
 
   public void run () {
 
     int logicalTime = 0;
 
     String[] spacer = new String[bucket.length];
     spacer[0] = "";
     for (int i = 1; i < spacer.length; i++) {
       spacer[i] = spacer[i - 1] + "  ";
     }
 
     String message = "Hello world from " + id + " ==> time = ";
 
     while (true) {
       logicalTime += id;
       final int slot = logicalTime % bucket.length;     // assume: id <= bucket.length
       bucket[slot].fallInto ();
       System.out.println (spacer[slot] + message + logicalTime);   // one unit of work
     }
   }
 
 }
 

Danger - Race Hazard

This example contains a race hazard whose elimination is left as an exercise for the reader. The problem is to ensure that all flushed Dingbats have settled in their next chosen buckets before the BucketKeeper next flushes it. If a Dingbat is a bit slow, it may fall into its chosen bucket too late - the BucketKeeper has already flushed it and the creature will have to remain there until the next cycle.

With the rate of flushing used in the above system, this is unlikely to happen. But it is just possible - if something suspended execution of the system for a few seconds immediately following a flush, then the BucketKeeper could be rescheduled before the flying Dingbats.

Acknowledgement

This example is from a discrete event modelling approach due to Jon Kerridge (Napier University, Scotland). The Dingbats could easilly model their own timeouts for themselves. However, setting a timeout is an O(n) operation (where n is the number of processes setting them). Here, there is only one process setting timeouts (the BucketKeeper) and the bucket operations fallInto and flush have O(1) costs (at least, that is the case for occam).

Of course, removal of the above race hazard means that the timeout by the BucketKeeper can also be eliminated. The buckets can be flushed just as soon as it knows that the previously flushed Dingbats have settled. In this way, the event model can proceed at full speed, maintaining correct simulated time - each bucket representing one time unit - without needing any timeouts in the simulation itself.

Author:
P.H.Welch
See Also:
Barrier, Serialized Form

Constructor Summary
Bucket()
           
 
Method Summary
static Bucket[] create(int n)
          Creates an array of Buckets.
 void fallInto()
          Fall into the bucket.
 int flush()
          Flush the bucket.
 int holding()
          This returns the number of processes currently held in the bucket.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

Bucket

public Bucket()
Method Detail

fallInto

public void fallInto()
Fall into the bucket. The process doing this will be blocked until the next flush().

flush

public int flush()
Flush the bucket. All held processes will be released. It returns the number that were released.

Returns:
the number of processes flushed.

holding

public int holding()
This returns the number of processes currently held in the bucket. Note that this number is volatile - for information only! By the time the invoker of this method receives it, it might have changed (because of further processes falling into the bucket or someone flushing it).

Returns:
the number of processes currently held in the bucket.

create

public static Bucket[] create(int n)
Creates an array of Buckets.

Parameters:
n - the number of Buckets to create in the array
Returns:
the array of Buckets

CSP for Java
(JCSP) 1.0-rc4

Submit a bug or feature to jcsp-team@ukc.ac.uk
Version 1.0-rc4 of the JCSP API Specification (Copyright 1997-2000 P.D.Austin and P.H.Welch - All Rights Reserved)
Java is a trademark or registered trademark of Sun Microsystems, Inc. in the US and other countries.