Java Threads mailing list archive

Re: A CSP model for Java threads

From: Jeremy Martin <jeremy.martin@computing-services.oxford.ac.uk>
Date: Fri, 30 Jul 1999 15:46:55 +0100 (BST)

Dear Peter,

Here is an FDR analysis of your model for java threads which shows that
the JCSP channel is equivalent to something very simple in CSP. I found
a couple of very minor bugs in your initial CSP specification, but
otherwise I stayed true to your definition.

I'm copying this to the mailing lists for wider discussion.

Jeremy

--
-- FDR analysis of Peter Welch's CSP model of Java threads AND
-- the JCSP implementation of a channel.
--
-- Jeremy Martin  30th July 1999 
--

--
-- We shall model a system consisting of a set of Java objects and threads.
-- 

Objects = {0}
Threads = {0,1}

--
-- We define a collection of channels to model the synchronisation
-- events.
--

channel claim, release, waita, waitb, notify, notifyall: Objects.Threads

--
-- System processes: LOCK(o) controls locking of object o's monitor and
-- SWAIT controls its auxilliary pool of waiting threads ts. 
--

LOCK(o) = claim.o?t -> LOCKED(o,t)

LOCKED(o,t) = release.o.t -> LOCK(o)
              []
              notify.o.t -> LOCKED(o,t)
              []
              notifyall.o.t -> LOCKED(o,t)

alphaLOCK(o) = { claim.o.t,release.o.t,notify.o.t,notifyall.o.t 
                 | t <- Threads }

--
-- A notify event results in one thread being non-deterministically
-- selected from the waiting pool and reactivated; a notifyall
-- event results in all the threads being activated in some
-- non-deterministic order (changed from PHW's original which was 
-- deterministic).
--

SWAIT(o,ts) =  let
                 n = card(ts)         
               within
                 (
                   waita.o?t -> SWAIT(o,union(ts, {t}))
                 ) [] (
                   notify.o?t -> 
                   if (n > 0) then
                     |~| s:ts @ waitb.o!s -> SWAIT(o,diff(ts,{s}))
                   else                      
                     SWAIT(o,{})
                 ) [] (
                   notifyall.o?t -> RELEASE(o,ts)
                 )

RELEASE(o,ts) = let 
                  n = card(ts)
                within
                  if (n > 0) then
                    |~| t:ts @ waitb.o!t -> RELEASE(o,diff(ts,{t}))
                  else
                    SWAIT(o,{})       

alphaSWAIT(o) = { 
                  waita.o.t,waitb.o.t,notify.o.t,notifyall.o.t 
                  | t <- Threads
                }

--
-- The monitor is defined as the parallel composition of the two system 
-- processes. Initially the auxiliary wait queue is empty.
--

MONITOR(o) = SWAIT(o,{}) [ alphaSWAIT(o) || alphaLOCK(o) ] LOCK(o)

alphaMONITOR(o) = union(alphaSWAIT(o),alphaLOCK(o))

--
-- Now we define the standard programmer's interface to monitors. 
-- Note that the WAIT procedure has been modified from Peter's original
-- CSP definition. An object needs to join the wait queue, by issuing
-- a waita event,  prior to releasing the monitor otherwise it might miss 
-- being notified. This was revealed by FDR in an early run.
--
-- For the moment we'll prohibit multiple locks by a particular thread
-- on a particular object. This could easily be handled by using processes
-- to represent the relevant lock counts (as Peter has shown).
--

STARTSYNC(o,me) = claim.o!me -> SKIP

ENDSYNC(o,me) = release.o!me -> SKIP

WAIT(o,me) = waita.o.me -> release.o!me -> waitb.o!me -> claim.o!me -> SKIP

NOTIFY(o,me) = notify.o.me -> SKIP

--
-- JCSP Implementation of channel:
--
-- This is translated from "P.H.Welch, Java Threads in the light of
-- occam/CSP, Proceedings of WoTUG21 1998", page 274.
--
-- The JCSP channel object has three variables (channel_empty, channel_hold
-- and local), which we shall model as processes always ready both to 
-- have their value reset or to report it to any willing thread. We
-- assume, for simplicity, that each channel has the same `boolean' type.
--

datatype Variables = channel_empty | channel_hold | local
datatype Data = TRUE | FALSE | OTHER
channel getvar, setvar: Objects.Variables.Threads.Data

--
-- Variables are initialised as TRUE.
--

VARIABLE(o,v) = VARIABLE2(o,v,TRUE)

VARIABLE2(o,v,d) = (
                     [] t:Threads @ getvar.o.v.t!d -> VARIABLE2(o,v,d)
                   ) [] (
                     [] t:Threads @ setvar.o.v.t?x -> VARIABLE2(o,v,x)
                   )

alphaVARIABLE(o,v) = {
                       getvar.o.v.t.d, setvar.o.v.t.d 
                       | t <- Threads, d <- Data
                     }

VARIABLES(o) = 
    (
        VARIABLE(o,channel_empty) 
      [alphaVARIABLE(o,channel_empty) || alphaVARIABLE(o,channel_hold)] 
        VARIABLE(o,channel_hold)
    ) 
  [union(alphaVARIABLE(o,channel_empty),alphaVARIABLE(o,channel_hold)) 
  || alphaVARIABLE(o,local)] 
    VARIABLE(o,local)

alphaVARIABLES(o) = union(union(alphaVARIABLE(o,channel_empty),
                    alphaVARIABLE(o,channel_hold)),alphaVARIABLE(o,local)) 

--
-- One purpose of JCSP is to seal off the thread/monitor synchronisation
-- calls from the programmer. Instead a read/write interface is provided
-- by two simple methods.
--
-- We shall model this interface with the following events:
--   write.o.t.d   - thread t activates java method write(d) of object o,
--                   message d is supplied for transmission.
--   ack.o.t       - call to write(d) terminates,
--   ready.o.t'    - Thread t' activates method read() of object o.
--   read.o.t'.d   - call to read() terminates, message d is returned
--
-- The JCSP channel should behave like a synchronised channel. Each
-- successful communcation requires that at some point both threads
-- are simultaneously involved.
-- 

channel read, write: Objects.Threads.Data
channel ready, ack: Objects.Threads

--
-- We model the JCSP read method as a process which repeatly waits to be
-- activated by a ready event and then executes the monitor sychronisation 
-- code to receive a message.
-- 

READ(o,t) = ready.o.t -> 
            claim.o.t -> 
            getvar.o.channel_empty.t?c ->
            (
              if (c == TRUE) then 
                setvar.o.channel_empty.t!FALSE -> 
                WAIT(o,t);
                getvar.o.channel_hold.t?mess -> 
                setvar.o.local.t!mess -> 
                NOTIFY(o,t)
              else
                setvar.o.channel_empty.t!TRUE -> 
                getvar.o.channel_hold.t?mess -> 
                setvar.o.local.t!mess -> 
                NOTIFY(o,t)
            ) ;
            getvar.o.local.t?mess -> 
            release.o.t ->    
            read.o.t!mess -> 
            READ(o,t)

alphaREAD(o,t) = {
                   claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t, 
                   setvar.o.v.t.d, read.o.t.d, release.o.t, waita.o.t, 
                   waitb.o.t, ready.o.t  
                   | v <- Variables, d <- Data
                 }

--
-- The JCSP write method is similarly modelled as a repeating process,
-- activated by the write event.
--

WRITE(o,t) = write.o.t?mess -> 
             claim.o.t ->  
             setvar.o.channel_hold.t!mess ->
             getvar.o.channel_empty.t?c ->
             (if (c == TRUE) then
                setvar.o.channel_empty.t!FALSE -> 
                WAIT(o,t)
              else
                setvar.o.channel_empty.t!TRUE -> 
                NOTIFY(o,t); 
                WAIT(o,t));
             release.o.t -> 
             ack.o.t -> 
             WRITE(o,t)

alphaWRITE(o,t) = { 
                    claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t,
                    setvar.o.v.t.d, write.o.t.d, release.o.t, waita.o.t, 
                    waitb.o.t, ack.o.t 
                    | v <- Variables, d <- Data
                  }


--
-- The JCSP channel is then modelled as the parallel composition of
-- processes which model its monitor, three variables, and  read and write
-- methods.
--

JCSPCHANNEL(o,t1,t2) = 
   (
     READ(o,t1) 
       [alphaREAD(o,t1) || alphaWRITE(o,t2)] 
     WRITE(o,t2)
   )
 [union(alphaREAD(o,t1),alphaWRITE(o,t2)) || 
  union(alphaMONITOR(o),alphaVARIABLES(o))] 
   (
     MONITOR(o) 
       [alphaMONITOR(o) || alphaVARIABLES(o)]
     VARIABLES(o)
   ) 

alphaJCSPCHANNEL(o,t1,t2) = 
  union(union(alphaMONITOR(o),alphaVARIABLES(o)),
        union(alphaREAD(o,t1),alphaWRITE(o,t2)))

--
-- Now we shall define a simplified model of how the JCSP channel
-- should work, and then, hopefully, we shall use FDR to show that this is
-- equivalent to the JCSP implementation. 
--
-- The simple channel consists of two parallel processes, LEFT and RIGHT,
-- to handle input and output respectively. The processes are joined
-- by a hidden channel transmit.
--  

channel transmit:Objects.Data

LEFT(o,t) = write.o.t?mess -> transmit.o!mess -> ack.o.t -> LEFT(o,t)

RIGHT(o,t') = ready.o.t' -> transmit.o?mess -> read.o.t'!mess -> RIGHT(o,t')

alphaLEFT(o,t) = {write.o.t.m, transmit.o.m, ack.o.t | m <- Data}

alphaRIGHT(o,t') = {ready.o.t', transmit.o.m, read.o.t'.m | m <- Data}

CHANNEL(o,t,t') = 
  (
      LEFT(o,t') 
    [alphaLEFT(o,t') || alphaRIGHT(o,t)] 
      RIGHT(o,t)
  ) \ {transmit.o.m | m <- Data}

alphaCHANNEL(o,t,t') = 
  {write.o.t'.m, ack.o.t', ready.o.t, read.o.t.m | m <- Data} 

--
-- In order to compare this specification with the JCSP implementation
-- we need to conceal all the additional events in the alphabet
-- of JCSPCHANNEL
--

Private = diff(alphaJCSPCHANNEL(0,0,1),alphaCHANNEL(0,0,1))

--
-- Assert that JCSPCHANNEL(o,t1,t2) \ Private is equivalent to 
-- CHANNEL(o,t1,t2) in the Failures/Divergences model (by asserting
-- refinement in both directions).
--

assert CHANNEL(0,0,1) [FD= JCSPCHANNEL(0,0,1) \ Private

assert JCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)

--
-- That works fine with the current system where only two threads
-- are in existence, but when we increase the number of threads in the 
-- system beyond two, perhaps by redefining Threads = {0,1,2}, we find that 
-- the above pair of assertions no longer holds. FDR reveals that this is 
-- because the new threads may `fiddle' with the state of the channel 
-- implementation.
--
-- So how do we stop other threads from interfering with the channel object?
--
-- In CSP we can add a parallel process to the channel implementation which 
-- blocks access to any of the the relevant events! (Unfortunately this is
-- not supported by the actual Java implementation, and so must be
-- regarded as a usage rule for JCSP.)
--

PROTECTION(o,t,t') = STOP

alphaPROTECTION(o,t,t') = { claim.o.t'', setvar.o.v.t''.d, 
                            getvar.o.v.t''.d, waita.o.t''
                            | t'' <- diff(Threads,{t,t'}),
                              v <- Variables,
                              d <- Data
                          }

SAFEJCSPCHANNEL(o,t,t') = 
    JCSPCHANNEL(o,t,t')
  [ alphaJCSPCHANNEL(o,t,t') || alphaPROTECTION(o,t,t') ]
    PROTECTION(o,t,t')

assert CHANNEL(0,0,1) [FD= SAFEJCSPCHANNEL(0,0,1) \ Private

assert SAFEJCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)

--
-- This pair of assertions is indeed found to hold when the number of threads 
-- is increased beyond 2.
--

--
-- Summary:
--
-- We have shown how to reduce the CSP model of the JCSP channel 
-- implementation to a far simpler equivalent form. Further work
-- would be useful to analyse the JCSP implementation of alternation.
--

 -------------------------------------------------------------------------
                            Dr J. M. R. Martin                
 Oxford Supercomputing Centre                          Tel: (01865) 273849
 Wolfson Building                                      Fax: (01865) 273839
 Parks Road                           Email: jeremy.martin@comlab.ox.ac.uk
 Oxford OX1 3QD                         WWW: http://users.ox.ac.uk/~jeremy


	


Last updated: Tue Nov 2 12:11:42 1999
Maintained by Peter Welch.