|
CSP for Java (JCSP) 1.0-rc7 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: INNER | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object | +--jcsp.lang.Bucket
This enables bucket synchronisation between a set of processes.
Shortcut to the Constructor and Method Summaries.
Barrier
. A bucket is
somewhere to fallInto
when a process needs somewhere to park
itself. There is no limit on the number of processes that can fallInto
a bucket - and all are blocked when they do.
Release happens when a process (and it will have
to be another process) chooses to flush
that bucket.
When that happens, all processes in the bucket (which may be none) are
rescheduled for execution.
Buckets are a non-deterministic primitive, since the decision to flush is a free (internal) choice of the process concerned and the scheduling of that flush impacts on the semantics. Usually, only one process is given responsibility for flushing a bucket (or set of buckets). Flushing a bucket does not block the flusher.
Note: this notion of bucket corresponds to the BUCKET synchronisation primitive added to the KRoC occam language system.
fallInto
and flush
methods of Bucket are just a re-badging of the
wait
and
notifyAll
methods of
Object
- but without the need to gain a monitor lock and
without the need to look out for the wait being interrupted.
Currently, though, this is how they are implemented. Beware that a notifyAll carries an O(n) overhead (where n is the number of processes being notified), since each notified process must regain the monitor lock before it can exit the synchronized region. Future JCSP implementations of Bucket will look to follow occam kernels and reduce the overheads of both fallInto and flush to O(1).
_____________ | | | Flusher | |_____________| | [===================================================================] bucket | | | ______|______ ______|______ ______|______ | | | | | | | Worker(0) | | Worker(1) | ... | Worker(9) | |_____________| |_____________| |_____________|Here is the code for this network:
import jcsp.lang.*; public class BucketExample1 { public static void main (String[] args) { final int nWorkers = 10; final int second = 1000; // JCSP timer units are milliseconds final int interval = 5*second; final int maxWork = 10*second; final long seed = new CSTimer ().read (); // for the random number generators final Bucket bucket = new Bucket (); final Flusher flusher = new Flusher (interval, bucket); final Worker[] workers = new Worker[nWorkers]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker (i, i + seed, maxWork, bucket); } System.out.println ("*** Flusher: interval = " + interval + " milliseconds"); new Parallel ( new CSProcess[] { flusher, new Parallel (workers) } ).run (); } }A Worker cycle consists of one shift (which takes a random amount of time) and, then, falling into the bucket:
import jcsp.lang.*; import java.util.*; public class Worker implements CSProcess { private final int id; private final long seed; private final int maxWork; private final Bucket bucket; public Worker (int id, long seed, int maxWork, Bucket bucket) { this.id = id; this.seed = seed; this.maxWork = maxWork; this.bucket = bucket; } public void run () { final Random random = new Random (seed); // each process gets a different seed final CSTimer tim = new CSTimer (); final int second = 1000; // JCSP timer units are milliseconds final String working = "\t... Worker " + id + " working ..."; final String falling = "\t\t\t ... Worker " + id + " falling ..."; final String flushed = "\t\t\t\t\t\t ... Worker " + id + " flushed ..."; while (true) { System.out.println (working); // these lines represent tim.sleep (random.nextInt (maxWork)); // one unit of work System.out.println (falling); bucket.fallInto (); System.out.println (flushed); } } }The Flusher just flushes the bucket at preset time intervals:
import jcsp.lang.*; import java.util.*; public class Flusher implements CSProcess { private final int interval; private final Bucket bucket; public Flusher (int interval, Bucket bucket) { this.interval = interval; this.bucket = bucket; } public void run () { final CSTimer tim = new CSTimer (); long timeout = tim.read () + interval; while (true) { tim.after (timeout); System.out.println ("*** Flusher: about to flush ..."); final int n = bucket.flush (); System.out.println ("*** Flusher: number flushed = " + n); timeout += interval; } } }
________________ | | | BucketKeeper | |________________| | [===================================================================] bucket[0] | | | | [===================================================================] bucket[1] | | | | . . . . . . . . . . . . | | | | [===================================================================] bucket[19] | | | _______|______ _______|______ _______|______ | | | | | | | Dingbat(2) | | Dingbat(3) | ... | Dingbat(10) | |______________| |______________| |______________|And here is the network code:
import jcsp.lang.*; public class BucketExample2 { public static void main (String[] args) { final int minDingbat = 2; final int maxDingbat = 10; final int nDingbats = (maxDingbat - minDingbat) + 1; final int nBuckets = 2*maxDingbat; final Bucket[] bucket = Bucket.create (nBuckets); final int second = 1000; // JCSP timer units are milliseconds final int tick = second; final BucketKeeper bucketKeeper = new BucketKeeper (tick, bucket); final Dingbat[] dingbats = new Dingbat[nDingbats]; for (int i = 0; i < dingbats.length; i++) { dingbats[i] = new Dingbat (i + minDingbat, bucket); } new Parallel ( new CSProcess[] { bucketKeeper, new Parallel (dingbats) } ).run (); } }The BucketKeeper keeps time, flushing buckets in sequence at a steady rate. When the last one has been flushed, it starts again with the first:
import jcsp.lang.*; class BucketKeeper implements CSProcess { private final long interval; private final Bucket[] bucket; public BucketKeeper (long interval, Bucket[] bucket) { this.interval = interval; this.bucket = bucket; } public void run () { String[] spacer = new String[bucket.length]; spacer[0] = ""; for (int i = 1; i < spacer.length; i++) { spacer[i] = spacer[i - 1] + " "; } final CSTimer tim = new CSTimer (); long timeout = tim.read (); int index = 0; while (true) { final int n = bucket[index].flush (); if (n == 0) { System.out.println (spacer[index] + "*** bucket " + index + " was empty ..."); } index = (index + 1) % bucket.length; timeout += interval; tim.after (timeout); } } }So the buckets represent time values. A process falling into one of them will sleep until the prescribed time when the BucketKeeper next flushes it.
Dingbats live the following cycle. First, they do some work (rather brief in the following code). Then, they work out which bucket to fall into and fall into it - that is all. In this case, Dingbats just fly on id buckets from whence they were just flushed (where id is the Dingbat number indicated in the above network diagram):
import jcsp.lang.*; public class Dingbat implements CSProcess { private final int id; private final Bucket[] bucket; public Dingbat (int id, Bucket[] bucket) { this.id = id; this.bucket = bucket; } public void run () { int logicalTime = 0; String[] spacer = new String[bucket.length]; spacer[0] = ""; for (int i = 1; i < spacer.length; i++) { spacer[i] = spacer[i - 1] + " "; } String message = "Hello world from " + id + " ==> time = "; while (true) { logicalTime += id; final int slot = logicalTime % bucket.length; // assume: id <= bucket.length bucket[slot].fallInto (); System.out.println (spacer[slot] + message + logicalTime); // one unit of work } } }
With the rate of flushing used in the above system, this is unlikely to happen. But it is just possible - if something suspended execution of the system for a few seconds immediately following a flush, then the BucketKeeper could be rescheduled before the flying Dingbats.
Of course, removal of the above race hazard means that the timeout by the BucketKeeper can also be eliminated. The buckets can be flushed just as soon as it knows that the previously flushed Dingbats have settled. In this way, the event model can proceed at full speed, maintaining correct simulated time - each bucket representing one time unit - without needing any timeouts in the simulation itself.
Barrier
, Serialized FormConstructor Summary | |
Bucket()
|
Method Summary | |
static Bucket[] |
create(int n)
Creates an array of Buckets. |
void |
fallInto()
Fall into the bucket. |
int |
flush()
Flush the bucket. |
int |
holding()
This returns the number of processes currently held in the bucket. |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
public Bucket()
Method Detail |
public void fallInto()
flush()
.public int flush()
public int holding()
public static Bucket[] create(int n)
n
- the number of Buckets to create in the array
|
CSP for Java (JCSP) 1.0-rc7 |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: INNER | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |