Java Threads mailing list archive
Re: A CSP model for Java threads
From: Jeremy Martin <jeremy.martin@computing-services.oxford.ac.uk>
Date: Fri, 30 Jul 1999 15:46:55 +0100 (BST)
Dear Peter,
Here is an FDR analysis of your model for java threads which shows that
the JCSP channel is equivalent to something very simple in CSP. I found
a couple of very minor bugs in your initial CSP specification, but
otherwise I stayed true to your definition.
I'm copying this to the mailing lists for wider discussion.
Jeremy
--
-- FDR analysis of Peter Welch's CSP model of Java threads AND
-- the JCSP implementation of a channel.
--
-- Jeremy Martin 30th July 1999
--
--
-- We shall model a system consisting of a set of Java objects and threads.
--
Objects = {0}
Threads = {0,1}
--
-- We define a collection of channels to model the synchronisation
-- events.
--
channel claim, release, waita, waitb, notify, notifyall: Objects.Threads
--
-- System processes: LOCK(o) controls locking of object o's monitor and
-- SWAIT controls its auxilliary pool of waiting threads ts.
--
LOCK(o) = claim.o?t -> LOCKED(o,t)
LOCKED(o,t) = release.o.t -> LOCK(o)
[]
notify.o.t -> LOCKED(o,t)
[]
notifyall.o.t -> LOCKED(o,t)
alphaLOCK(o) = { claim.o.t,release.o.t,notify.o.t,notifyall.o.t
| t <- Threads }
--
-- A notify event results in one thread being non-deterministically
-- selected from the waiting pool and reactivated; a notifyall
-- event results in all the threads being activated in some
-- non-deterministic order (changed from PHW's original which was
-- deterministic).
--
SWAIT(o,ts) = let
n = card(ts)
within
(
waita.o?t -> SWAIT(o,union(ts, {t}))
) [] (
notify.o?t ->
if (n > 0) then
|~| s:ts @ waitb.o!s -> SWAIT(o,diff(ts,{s}))
else
SWAIT(o,{})
) [] (
notifyall.o?t -> RELEASE(o,ts)
)
RELEASE(o,ts) = let
n = card(ts)
within
if (n > 0) then
|~| t:ts @ waitb.o!t -> RELEASE(o,diff(ts,{t}))
else
SWAIT(o,{})
alphaSWAIT(o) = {
waita.o.t,waitb.o.t,notify.o.t,notifyall.o.t
| t <- Threads
}
--
-- The monitor is defined as the parallel composition of the two system
-- processes. Initially the auxiliary wait queue is empty.
--
MONITOR(o) = SWAIT(o,{}) [ alphaSWAIT(o) || alphaLOCK(o) ] LOCK(o)
alphaMONITOR(o) = union(alphaSWAIT(o),alphaLOCK(o))
--
-- Now we define the standard programmer's interface to monitors.
-- Note that the WAIT procedure has been modified from Peter's original
-- CSP definition. An object needs to join the wait queue, by issuing
-- a waita event, prior to releasing the monitor otherwise it might miss
-- being notified. This was revealed by FDR in an early run.
--
-- For the moment we'll prohibit multiple locks by a particular thread
-- on a particular object. This could easily be handled by using processes
-- to represent the relevant lock counts (as Peter has shown).
--
STARTSYNC(o,me) = claim.o!me -> SKIP
ENDSYNC(o,me) = release.o!me -> SKIP
WAIT(o,me) = waita.o.me -> release.o!me -> waitb.o!me -> claim.o!me -> SKIP
NOTIFY(o,me) = notify.o.me -> SKIP
--
-- JCSP Implementation of channel:
--
-- This is translated from "P.H.Welch, Java Threads in the light of
-- occam/CSP, Proceedings of WoTUG21 1998", page 274.
--
-- The JCSP channel object has three variables (channel_empty, channel_hold
-- and local), which we shall model as processes always ready both to
-- have their value reset or to report it to any willing thread. We
-- assume, for simplicity, that each channel has the same `boolean' type.
--
datatype Variables = channel_empty | channel_hold | local
datatype Data = TRUE | FALSE | OTHER
channel getvar, setvar: Objects.Variables.Threads.Data
--
-- Variables are initialised as TRUE.
--
VARIABLE(o,v) = VARIABLE2(o,v,TRUE)
VARIABLE2(o,v,d) = (
[] t:Threads @ getvar.o.v.t!d -> VARIABLE2(o,v,d)
) [] (
[] t:Threads @ setvar.o.v.t?x -> VARIABLE2(o,v,x)
)
alphaVARIABLE(o,v) = {
getvar.o.v.t.d, setvar.o.v.t.d
| t <- Threads, d <- Data
}
VARIABLES(o) =
(
VARIABLE(o,channel_empty)
[alphaVARIABLE(o,channel_empty) || alphaVARIABLE(o,channel_hold)]
VARIABLE(o,channel_hold)
)
[union(alphaVARIABLE(o,channel_empty),alphaVARIABLE(o,channel_hold))
|| alphaVARIABLE(o,local)]
VARIABLE(o,local)
alphaVARIABLES(o) = union(union(alphaVARIABLE(o,channel_empty),
alphaVARIABLE(o,channel_hold)),alphaVARIABLE(o,local))
--
-- One purpose of JCSP is to seal off the thread/monitor synchronisation
-- calls from the programmer. Instead a read/write interface is provided
-- by two simple methods.
--
-- We shall model this interface with the following events:
-- write.o.t.d - thread t activates java method write(d) of object o,
-- message d is supplied for transmission.
-- ack.o.t - call to write(d) terminates,
-- ready.o.t' - Thread t' activates method read() of object o.
-- read.o.t'.d - call to read() terminates, message d is returned
--
-- The JCSP channel should behave like a synchronised channel. Each
-- successful communcation requires that at some point both threads
-- are simultaneously involved.
--
channel read, write: Objects.Threads.Data
channel ready, ack: Objects.Threads
--
-- We model the JCSP read method as a process which repeatly waits to be
-- activated by a ready event and then executes the monitor sychronisation
-- code to receive a message.
--
READ(o,t) = ready.o.t ->
claim.o.t ->
getvar.o.channel_empty.t?c ->
(
if (c == TRUE) then
setvar.o.channel_empty.t!FALSE ->
WAIT(o,t);
getvar.o.channel_hold.t?mess ->
setvar.o.local.t!mess ->
NOTIFY(o,t)
else
setvar.o.channel_empty.t!TRUE ->
getvar.o.channel_hold.t?mess ->
setvar.o.local.t!mess ->
NOTIFY(o,t)
) ;
getvar.o.local.t?mess ->
release.o.t ->
read.o.t!mess ->
READ(o,t)
alphaREAD(o,t) = {
claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t,
setvar.o.v.t.d, read.o.t.d, release.o.t, waita.o.t,
waitb.o.t, ready.o.t
| v <- Variables, d <- Data
}
--
-- The JCSP write method is similarly modelled as a repeating process,
-- activated by the write event.
--
WRITE(o,t) = write.o.t?mess ->
claim.o.t ->
setvar.o.channel_hold.t!mess ->
getvar.o.channel_empty.t?c ->
(if (c == TRUE) then
setvar.o.channel_empty.t!FALSE ->
WAIT(o,t)
else
setvar.o.channel_empty.t!TRUE ->
NOTIFY(o,t);
WAIT(o,t));
release.o.t ->
ack.o.t ->
WRITE(o,t)
alphaWRITE(o,t) = {
claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t,
setvar.o.v.t.d, write.o.t.d, release.o.t, waita.o.t,
waitb.o.t, ack.o.t
| v <- Variables, d <- Data
}
--
-- The JCSP channel is then modelled as the parallel composition of
-- processes which model its monitor, three variables, and read and write
-- methods.
--
JCSPCHANNEL(o,t1,t2) =
(
READ(o,t1)
[alphaREAD(o,t1) || alphaWRITE(o,t2)]
WRITE(o,t2)
)
[union(alphaREAD(o,t1),alphaWRITE(o,t2)) ||
union(alphaMONITOR(o),alphaVARIABLES(o))]
(
MONITOR(o)
[alphaMONITOR(o) || alphaVARIABLES(o)]
VARIABLES(o)
)
alphaJCSPCHANNEL(o,t1,t2) =
union(union(alphaMONITOR(o),alphaVARIABLES(o)),
union(alphaREAD(o,t1),alphaWRITE(o,t2)))
--
-- Now we shall define a simplified model of how the JCSP channel
-- should work, and then, hopefully, we shall use FDR to show that this is
-- equivalent to the JCSP implementation.
--
-- The simple channel consists of two parallel processes, LEFT and RIGHT,
-- to handle input and output respectively. The processes are joined
-- by a hidden channel transmit.
--
channel transmit:Objects.Data
LEFT(o,t) = write.o.t?mess -> transmit.o!mess -> ack.o.t -> LEFT(o,t)
RIGHT(o,t') = ready.o.t' -> transmit.o?mess -> read.o.t'!mess -> RIGHT(o,t')
alphaLEFT(o,t) = {write.o.t.m, transmit.o.m, ack.o.t | m <- Data}
alphaRIGHT(o,t') = {ready.o.t', transmit.o.m, read.o.t'.m | m <- Data}
CHANNEL(o,t,t') =
(
LEFT(o,t')
[alphaLEFT(o,t') || alphaRIGHT(o,t)]
RIGHT(o,t)
) \ {transmit.o.m | m <- Data}
alphaCHANNEL(o,t,t') =
{write.o.t'.m, ack.o.t', ready.o.t, read.o.t.m | m <- Data}
--
-- In order to compare this specification with the JCSP implementation
-- we need to conceal all the additional events in the alphabet
-- of JCSPCHANNEL
--
Private = diff(alphaJCSPCHANNEL(0,0,1),alphaCHANNEL(0,0,1))
--
-- Assert that JCSPCHANNEL(o,t1,t2) \ Private is equivalent to
-- CHANNEL(o,t1,t2) in the Failures/Divergences model (by asserting
-- refinement in both directions).
--
assert CHANNEL(0,0,1) [FD= JCSPCHANNEL(0,0,1) \ Private
assert JCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)
--
-- That works fine with the current system where only two threads
-- are in existence, but when we increase the number of threads in the
-- system beyond two, perhaps by redefining Threads = {0,1,2}, we find that
-- the above pair of assertions no longer holds. FDR reveals that this is
-- because the new threads may `fiddle' with the state of the channel
-- implementation.
--
-- So how do we stop other threads from interfering with the channel object?
--
-- In CSP we can add a parallel process to the channel implementation which
-- blocks access to any of the the relevant events! (Unfortunately this is
-- not supported by the actual Java implementation, and so must be
-- regarded as a usage rule for JCSP.)
--
PROTECTION(o,t,t') = STOP
alphaPROTECTION(o,t,t') = { claim.o.t'', setvar.o.v.t''.d,
getvar.o.v.t''.d, waita.o.t''
| t'' <- diff(Threads,{t,t'}),
v <- Variables,
d <- Data
}
SAFEJCSPCHANNEL(o,t,t') =
JCSPCHANNEL(o,t,t')
[ alphaJCSPCHANNEL(o,t,t') || alphaPROTECTION(o,t,t') ]
PROTECTION(o,t,t')
assert CHANNEL(0,0,1) [FD= SAFEJCSPCHANNEL(0,0,1) \ Private
assert SAFEJCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)
--
-- This pair of assertions is indeed found to hold when the number of threads
-- is increased beyond 2.
--
--
-- Summary:
--
-- We have shown how to reduce the CSP model of the JCSP channel
-- implementation to a far simpler equivalent form. Further work
-- would be useful to analyse the JCSP implementation of alternation.
--
-------------------------------------------------------------------------
Dr J. M. R. Martin
Oxford Supercomputing Centre Tel: (01865) 273849
Wolfson Building Fax: (01865) 273839
Parks Road Email: jeremy.martin@comlab.ox.ac.uk
Oxford OX1 3QD WWW: http://users.ox.ac.uk/~jeremy
Last updated: Tue Nov 2 12:11:42 1999
Maintained by Peter Welch.