Previous Up Next

Chapter 1  Concurrent programming

This part of the manual is a tutorial introduction to JoCaml. This chapter starts with small, local examples. Then it deals with the distributed features. It is assumed that the reader has some previous knowledge of Objective Caml.

1.1  Conventions

Examples are given as JoCaml source, followed by the output of the top-level (or of the compiler when prompted to print types). The JoCaml top-level provides an interactive environment, much as the Objective Caml top-level.

In order to try the examples, you can either type them in a top-level, launched by the command jocaml, or concatenate the sources chunks in some file a.ml then compile a.ml by the command jocamlc a.ml, and finally run the produced code by the command ./a.out.

1.2  Basics

JoCaml programs are made of processes and expressions. Roughly, processes are executed asynchronously and produce no result, whereas expressions are evaluated synchronously and their evaluation produces values. For instance, Objective Caml expressions are JoCaml expressions. Processes communicate by sending messages on channels (a.k.a. port names). Messages carried by channels are made of zero or more values, and channels are values themselves. In contrast with other process calculi (such as the pi-calculus and its derived programming language Pict), channels and the processes that listen on them are defined in a single language construct. This allows considering (and implementing) channels as functions when they have the same usage.

JoCaml programs are first organized as a list of top-level statements. A Top-level statement is a declaration (such as an Objective Caml binding let x = 1 or a channel binding) or an expression. Top-level statements are terminated by an optional ;; that triggers evaluation in interactive mode.

1.2.1  Simple channel declarations

Channels, or port names, are the main new primitive values of JoCaml.

Users can create new channels with a new kind of def binding, which should not be confused with the ordinary value let binding. The right hand-side of the definition of a channel a is a process that will be spawned whenever some message is sent on a. Additionally, the contents of messages received on a are bound to formal parameters.

For instance, here is the definition of an echo channel:

#def echo(x) = print_int x; 0
 ;;
val echo : int Join.chan = <abstr>

The new channel echo has type int Join.chan, which is the type of channels carrying values of type int. Sending an integer i on echo fires an instance of the guarded process print_int x; 0 which prints the integer on the console. Since the Objective Caml expression print_int x returns the value (), it is necessary to append a ; 0 that discards this value. 0 is the empty process. As a regards syntax, the parenthesis around the formal argument “x” are mandatory.

Channel echo is an asynchronous channel since sending a message on this channel is a nonblocking operation and it is not possible to know when the actual printing takes place.

1.2.2  Processes

Processes are the new core syntactic class of JoCaml. The most basic process sends a message on an asynchronous channel, such as the channel echo just introduced. Since only declarations and expressions are allowed at top-level, processes are turned into expressions by “spawning” them: they are introduced by the keyword “spawn”.

#spawn echo(1)
 ;;
- : unit = ()
#spawn echo(2)
 ;;
- : unit = ()
12

Processes introduced by “spawn” are executed concurrently. The program above may either echo 1 then 2, or echo 2 then 1. Thus, the output above may be 12 or 21, depending on the implementation. The processes echo(1) and echo(2) are examples of the the most basic process: sending a message on some channel. The syntax for such message sending is the same as the one for function call in Objective Caml. Hence, writing echo 1 and echo 2 is correct. However, in this manual, we conventionally write message sending with argument between parenthesis.

Concurrent execution also occurs inside processes, using the parallel composition operator “&”. This provides a more concise, semantically equivalent, alternative to the previous example:

#spawn echo(1) & echo(2)
 ;;
- : unit = ()
21

Composite processes also include conditionals (if’s), matching (match’s) and local binding (letin’s and defin’s). Process grouping is done by using brackets “(” and “)” or the equivalent “begin” and “end”, just as in Objective Caml.

#spawn begin
   let x = 1 in
   (let y = x + 1 in echo(y) & echo(y+1)) & echo(x)
 end
 ;;
- : unit = ()
123

Once again, the program above may echo the integers 1, 2 and 3 in any order. Grouping is necessary around the process let y = in to restrict the scope of y such that its evaluation occurs independently of the process echo(x).

Processes may also include sequences. The general form of a sequence inside a process is expression; process, where the result of expression will be discarded. As expression can itself be a sequence, thus one may write:

#spawn begin
   print_int 1; print_int 2; echo(3)
 end
 ;;
- : unit = ()
123

A sequence may be terminated by an empty process that does nothing and is denoted by “0”. Thus, an alternative to the previous example is as follows:

#spawn begin
   print_int 1; print_int 2; print_int 3; 0
 end
 ;;
- : unit = ()
123

This is why print_int x; 0 in the definition of the echo channel is considered as a process.

1.2.3  More on channels

The guarded process in a channel definition can spawn several messages, as in a stuttering echo channel:

#def echo_twice(x) = echo(x) & echo(x)
 ;;
val echo_twice : int Join.chan = <abstr>

It is also possible to define directly such a channel, without referring to the channel echo, but by using the Objective Caml function print_int. In this case, it is necessary to enclose each use of print_int in “(” and “)”, as in this new definition of echo_twice:

#def echo_twice(x) = (print_int x; 0) & (print_int x; 0)
 ;;
val echo_twice : int Join.chan = <abstr>

Such grouping is necessary because “&” binds more tightly than “;”, as in:

#def echo3(x) = print_int x; echo(x) & echo(x)
 ;;
val echo3 : int Join.chan = <abstr>

Channels may accept tuple of values as arguments, and those arguments can be destructured with pattern matching notation. For example, the following channel f accepts pairs as shown by its type.

#def strange_echo(x,y) = echo (x+y) & echo (y-x)
 ;;
val strange_echo : (int * int) Join.chan = <abstr>

Hence, in JoCaml, polyadic channels are simply expressed as (monadic) channels that carry tuples.

Port names are first-class values in JoCaml. They can be sent as messages on other port names. As a result, higher order “ports” can be written, such as

#def twice(f,x) = f(x) & f(x )
 ;;
val twice : (’a Join.chan * ’a) Join.chan = <abstr>

The type for twice is polymorphic: it includes a type variable ’a that can be replaced by any type. Thus twice is a channel that takes a channel of type ’a Join.chan and a value of type ’a as arguments.

For instance, ’a can be the type of integers or the type of strings:

#def echo_string(s) = print_string s; 0
 ;;
val echo_string : string Join.chan = <abstr>
#spawn twice(echo,0) & twice(echo_string,"X")
 ;;
- : unit = ()
XX00

1.2.4  Synchronous channels

One perfectly can have a process to “return a value”: it suffices to parameterize it with a continuation.

#def succ(x,k) =  print_int x; k(x+1)
 ;;
val succ : (int * int Join.chan) Join.chan = <abstr>

Here, succ prints the value of x, and then sends the message x+1 to its continuation k. We insist on then : when the message is received by whoever is waiting at the other end, we can be sure that x has been printed. Or, more precisely, if the receiver also prints something, “something” should appear on the console after x. Let us define a continuation for succ:

#def k(x) = succ(x,echo)
 ;;
val k : int Join.chan = <abstr>
#spawn succ(1,k)
 ;; 
- : unit = ()
123

And we have yet another example of printing 123 in that order.

Although it can be fun, continuation passing style is not very convenient. JoCaml provides synchronous channels to define processes that return values more directly. The previous example can be written as follows:

#def succ(x) = print_int x; reply x+1 to succ
 ;;
val succ : int -> int = <fun>

The type of succ is the functional type int -> int that takes one integer as argument and returns an integer. However, succ is not a function because it is introduced by a def binding. Since the process terminates with reply x+1 to succ, succ is a synchronous channel which returns the x+1 value. The mechanism to return values for synchronous channels is different from the one for functions: it uses a reply/to construct whose semantics is to send back some values as result. This is the first difference with plain Objective Caml functions, which implicitly return the value of the guarded expression, instead of using the explicit reply/to.

Synchronous names can be used to support a functional programming style. A traditional example is the Fibonacci function.

#def fib(n) =
   if n <= 1 then reply 1 to fib
   else reply fib(n-1) + fib(n-2) to fib
 ;;
val fib : int -> int = <fun>
#print_int (fib 10)
 ;;
- : unit = ()
89

In contrast with Objective Caml let definitions, channel definitions are always potentially recursive.

Since synchronous channels have the same type and behave like functions, they seem useless. However there are significant differences, as explained by the next section on join patterns, and by the next chapter on distributed programming.

1.3  Join patterns

Join patterns significantly extend port name definitions.

1.3.1  Basics

A join pattern defines several ports simultaneously and specifies a synchronization pattern between these co-defined ports. For instance, the following source fragment defines two synchronizing port names fruit and cake:

#def fruit(f) & cake(c) = print_endline (f^" "^c) ; 0
 ;;
val fruit : string Join.chan = <abstr>
val cake : string Join.chan = <abstr>

To trigger the guarded process print_endline (f^" "^c) ; 0, messages must be sent on both fruit and cake.

#spawn fruit("apple") & cake("pie")
 ;;
- : unit = ()
apple pie

The parallel composition operator “&” appears both in join patterns and in processes. This highlights the kind of synchronization that the pattern matches.

Join definitions such as the one for fruit and cake provide a simple mean to express non-determinism.

#spawn fruit "apple" & fruit "raspberry" & cake "pie" & cake "crumble"
 ;;
- : unit = ()
raspberry pie
apple crumble

Two cake names must appear on the console, but both combinations of fruits and cakes are correct.

Composite join definitions can specify several synchronization patterns.

#def apple() & pie() = print_string "apple pie" ; 0
  or raspberry() & pie() = print_string "raspberry pie" ; 0
 ;;
val apple : unit Join.chan = <abstr>
val raspberry : unit Join.chan = <abstr>
val pie : unit Join.chan = <abstr>

Observe that the name pie is defined only once. Thus, pie potentially takes part in two synchronizations. This co-definition is expressed by the keyword or.

Again, internal choice is performed when only one invocation of pie is present:

#spawn apple() & raspberry() & pie()
 ;;
- : unit = ()
raspberry pie

1.3.2  ML pattern matching in join patterns

Up to now, we saw that the formal argument of a channel definition can be a variable or a tuple of variables. More generally, such a formal argument can be a pattern (in the Objective Caml pattern matching sense):

#type fruit = Apple | Raspberry | Cheese
 and desert = Pie | Cake
 ;;
type fruit = Apple | Raspberry | Cheese
and desert = Pie | Cake
#def f(Apple) & d(Pie) = echo_string("apple pie")
 or  f(Raspberry) & d(Pie) = echo_string("raspberry pie")
 or  f(Raspberry) & d(Cake) = echo_string("raspberry cake")
 or  f(Cheese) & d(Cake) = echo_string("cheese cake")
 ;;
val f : fruit Join.chan = <abstr>
val d : desert Join.chan = <abstr>

The definition above yields four competing behavior on the pair of channels f and d. For instance:

#spawn f(Raspberry) & d(Pie) & d(Cake)
 ;;
- : unit = ()
raspberry cake

And we get either “raspberry pie” or “raspberry cake”.

The formal argument of channels can be any Objective Caml pattern. Here, by using “or-patterns” and as bindings for fruits, we can be more concise:

#let string_of_fruit = function
 | Apple -> "apple"
 | Raspberry -> "rasperry"
 | Cheese -> "cheese"
 ;;
val string_of_fruit : fruit -> string = <fun>
#def f(Apple|Raspberry as x) & d(Pie) = echo_string(string_of_fruit x^" pie")
 or  f(Raspberry|Cheese as x) & d(Cake) = echo_string(string_of_fruit x^" cake")
 ;;
val f : fruit Join.chan = <abstr>
val d : desert Join.chan = <abstr>
#spawn f(Raspberry) & d(Pie) & d(Cake)
 ;;
- : unit = ()
rasperry pie

And again the above display can be either desert with raspberry.

As another example, the following definition prints the sorted merge of two sorted lists sent as messages on channels i1 and i2.

#def i1([]) & i2([]) = 0
 or  i1(x::xs) & i2([]) = print_int x ; i1(xs) & i2([])
 or  i1([]) & i2(y::ys) = print_int y ; i1([]) & i2(ys)
 or  i1(x::xs) & i2(y::ys) =
       if x < y then begin print_int x ; i1(xs) & i2(y::ys) end
       else if y < x then begin print_int y ; i1(x::xs) & i2(ys) end
       else begin print_int x ; i1(xs) & i2(ys) end
 ;;
val i1 : int list Join.chan = <abstr>
val i2 : int list Join.chan = <abstr>
#spawn i1([1;3;4]) & i2([2;3])
 ;;
- : unit = ()
1234

It is important to notice that, by contrast with Objective Caml pattern matching, ambiguous matching are indeed ambiguous: as soon as a message matches a pattern it may be consumed, regardless of other receivers on the same channel.

#def c([]) = echo_string "Nil"
 or  c(_)  = echo_string "Anything"
 ;;
val c : ’a list Join.chan = <abstr>
#spawn c([])
 ;;
- : unit = ()
Anything

In the example above, you can see either “Anything” or “Nil” depending upon unspecified implementation details. To get the textual priority rule of Objective Caml matching semantics, use the match construct.

#def c(x) =
   match x with
   | [] -> echo_string "Nil"
   | _  -> echo_string "Anything"
 ;;
val c : ’a list Join.chan = <abstr>
#spawn c([])
 ;;
- : unit = ()
Nil

1.3.3  Mixing asynchronous and synchronous channel definitions

Join patterns are the programming paradigm for concurrency in JoCaml. They allow the encoding of many concurrent data structures. For instance, the following code defines a counter:

#def count(n) & inc() = count(n+1) & reply to inc
  or count(n) & get() = count(n) & reply n to get
 ;;
val inc : unit -> unit = <fun>
val count : int Join.chan = <abstr>
val get : unit -> int = <fun>
#spawn count(0)
 ;;
- : unit = ()

This definition calls for two remarks. First, join pattern may mix synchronous and asynchronous message. Second, the usage of the name count above is a typical way of ensuring mutual exclusion. For the moment, assume that there is at most one active invocation on count. When one invocation is active, count holds the counter value as a message and the counter is ready to be incremented or examined. Otherwise, some operation is being performed on the counter and pending operations are postponed until the operation being performed has left the counter in a consistent state. As a consequence, the counter may be used consistently by several threads.

#spawn (inc() ; inc() ; 0) & (inc() ; 0)
 ;;
- : unit = ()
#def wait () =
   let x = get () in
   if x < 3 then wait () 
   else begin
     print_string "three is enough !!!" ; print_newline () ; 0
   end
 ;;
val wait : unit Join.chan = <abstr>
#spawn wait ()
 ;;
- : unit = ()
three is enough !!!

Ensuring correct counter behavior in the example above requires some programming discipline: only one initial invocation on count has to be made. If there are more than one simultaneous invocations on count, then mutual exclusion is lost. If there is no initial invocation on count, then the counter will not work at all. This can be avoided by making the count, inc and get names local to a create_counter definition and then by exporting inc and get while hiding count, taking advantage of lexical scoping rules.

#let create_counter () =
   def count(n) & inc0() = count(n+1) & reply to inc0
    or count(n) & get0() = count(n) & reply n to get0 in
   spawn count(0) ;
   inc0, get0
 ;;
val create_counter : unit -> (unit -> unit) * (unit -> int) = <fun>
#let inc,get = create_counter ()
 ;;
val inc : unit -> unit = <fun>
val get : unit -> int = <fun>

This programming style is reminiscent of “object-oriented” programming: a counter is a thing called an object, it has some internal state (count and its argument), and it exports some methods to the external world (here, inc and get). The constructor create_counter creates a new object, initializes its internal state, and returns the exported methods. As a consequence, several counters may be allocated and used independently.

1.4  Control structures

Join pattern synchronization can express many common programming paradigms, either concurrent or sequential.

1.4.1  Some classical synchronization primitives

Locks

Join pattern synchronization can be used to emulate simple locks:

#let new_lock () =
   def free() & lock() = reply to lock
   and unlock() = free() & reply to unlock in
   spawn free() ;
   lock,unlock
 ;;
val new_lock : unit -> (unit -> unit) * (unit -> unit) = <fun>

Threads try to acquire the lock by performing a synchronous call on channel lock. Due to the definition of lock(), this consumes the name free and only one thread can get a response at a time. Another thread that attempts to acquire the lock is blocked until the thread that has the lock releases it by the synchronous call unlock that fires another invocation of free. As in Objective Caml, it is possible to introduce several bindings with the and keyword. These bindings are recursive.

To give an example of lock usage, we introduce a function that output its string argument several times:

#let print_n n s =
   for i = 1 to n do
     print_string s; Thread.delay 0.01
   done
 ;;
val print_n : int -> string -> unit = <fun>

The Thread.delay calls prevents the same thread from running long enough to print all its strings.

Now consider two threads, one printing *’s, the other printing +’s.

#spawn (print_n 21 "*" ; 0) & (print_n 21 "+" ; 0)
 ;;
- : unit = ()
+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*

As threads execute concurrently, their outputs may mix, depending upon scheduling. However, one can use a lock to delimit a critical section and prevent the interleaving of *’s and +’s.

#let lock, unlock = new_lock ()
 ;;
val lock : unit -> unit = <fun>
val unlock : unit -> unit = <fun>
#spawn begin
     (lock() ; print_n 21 "*" ; unlock() ; 0) 
   & (lock() ; print_n 21 "+" ; unlock() ; 0)  
 end
 ;;
- : unit = ()
+++++++++++++++++++++*********************

Barriers

A barrier is another common synchronization mechanism. Basically, barriers define synchronization points in the execution of parallel tasks. Here is a simple barrier that synchronizes two threads:

#def join1 () & join2 () = reply to join1 & reply to join2
 ;;
val join1 : unit -> unit = <fun>
val join2 : unit -> unit = <fun>

The following two threads print ba or ab between matching parenthesis:

#spawn begin
     (print_string "(" ; join1 () ; print_string "a" ; join1() ;
      print_string ")" ; 0)
   & (join2 () ; print_string "b" ; join2 () ; 0) 
 end
 ;;
- : unit = ()
(ab)

Waiting for n events to occur

A frequent idiom is for a program to fork n concurrent tasks, and then to wait for those to complete. We can reformulate the informal “fork(s) and wait” as follows: channel count holds an integer that records the number of events still to be waited for. Events to be counted are materialized by sending empty messages on channel tick.

#def count(n) & tick() = count(n-1)
 or  count(0) & wait() = reply to wait
 ;;
val tick : unit Join.chan = <abstr>
val count : int Join.chan = <abstr>
val wait : unit -> unit = <fun>

By sending an initial message n on count, we enable counting n events.

#let n = 5
 ;;
val n : int = 5
#spawn count(n)
 ;;
- : unit = ()

Finally, one can wait for the n events to occur by calling the synchronous channel wait. The reaction rule count(0) & wait() = reply to wait is to be noticed: the formal argument of count is a pattern, here the integer constant 0. This means that the guarded process will be firable only when message 0 is pending on channel count — See Section 1.3.2.

Beware, if at some time, count(0), tick() and wait() are active, then the clause count(n) & tick() = count(n-1) can be selected. As an imediate consequence, count(0) will never be active again, and the agent waiting on wait will stay blocked. However, if at most n messages are sent on tick, then the device above works as expected.

The “count n events” idiom is so frequent that we build a countdown generator as follows:

#let create_countdown n =
   def count(n) & tick() = count(n-1)
   or  count(0) & wait() = reply to wait in
   spawn count(n) ;
   tick,wait
 ;;
val create_countdown : int -> unit Join.chan * (unit -> unit) = <fun>

Now we get a fresh countdown started at n by create_countdown n. More precisely, “a fresh countdown” is the pair of asynchronous tick and synchronous wait, where wait will answer once n messages are sent on tick. Also observe that the internal channel count is now kept secret and that the countdown generator takes care of initialisation.

In the following code, n messages are sent on tick by n asynchronous agents once they have printed one digit, while the closing parenthesis is printed only after wait() has returned.

#let n = 5
 ;;
val n : int = 5
#let tick,wait = create_countdown n
 ;;
val tick : unit Join.chan = <abstr>
val wait : unit -> unit = <fun>
#print_string "(" ;
 for i = 0 to n-1 do
   spawn begin print_int i ; tick() end
 done ;
 wait () ;
 print_string ")"
 ;;
- : unit = ()
(24301)

As a result, the numbers 0, 1, 2, 3, 4 appear in any order, between parenthesis.

Collecting n results

A frequent variation on the idea of waiting for n events is collecting n results. Let us write slightly abstract code. We wish to combine n results using function f.

#let create_collector f y0 n =
   def count(y,n) & collect(x) = count(f x y,n-1)
   or  count(y,0) & wait() = reply y to wait in
   spawn count(y0,n) ;
   collect, wait
 ;;
val create_collector :
  (’a -> ’b -> ’b) -> ’b -> int -> ’a Join.chan * (unit -> ’b) = <fun>

The create_collector function is quite similar to the create_countdown function. Additionnaly, it accumulate messages sent to collect (tick in the case of countdown) into the first component of the internal message on count. Incidentally, observe that create_countdown can be implemented by using the more general create_collector.

#let create_countdown n = create_collector (fun () () -> ()) () n
 ;;
val create_countdown : int -> unit Join.chan * (unit -> unit) = <fun>

The following collect_as_sum will collect n integers on channel add and send their sum as the result of synchronous wait.

#let collect_as_sum n =
   let add, wait = create_collector (+) 0 n in
   add,wait
 ;;
val collect_as_sum : int -> int Join.chan * (unit -> int) = <fun>

One now easily computes the sum of the n first integers as follows:

#let n = 10
 ;;
val n : int = 10
#let add,wait = collect_as_sum n
 ;;
val add : int Join.chan = <abstr>
val wait : unit -> int = <fun>
#spawn for i=0 to n-1 do add(i) done
 ;;
- : unit = ()
#print_int (wait())
 ;;
- : unit = ()
45

It is to be noticed that collectors are provided by the JoCaml standard library module JoinCount.Collector.

#module Col = JoinCount.Collector
 ;;
module Col :
  sig
    type (’a, ’b) t =
      (’a, ’b) JoinCount.Collector.t = {
      collect : ’a Join.chan;
      wait : unit -> ’b;
    }
    val create : (’a -> ’b -> ’b) -> ’b -> int -> (’a, ’b) t
  end

Generally speaking, the standard library of JoCaml makes heavy use of records, as illustrated by the type of collectors (JoinCount.Collector.t above). As a matter of fact, records offer a convenient way to pack together the public channels of join-definitions.

Here is another way of summing the first n integers asynchronouosly, that time relying on the collectors of the standard library.

#let col = Col.create (+) 0 n
 ;;
val col : (int, int) Col.t = {Col.collect = <abstr>; Col.wait = <fun>}
#spawn for i=0 to  n-1 do col.Col.collect(i) done
 ;;
- : unit = ()
#print_int (col.Col.wait())
 ;;
- : unit = ()
45

In the code above, one should probably noticed that the field names of records are given in fully qualified notation as, for instance, col.Col.wait().

Bi-directional channels

Bi-directional channels appear in most process calculi. In the asynchronous pi-calculus, for instance, and for a given channel c, a value v can be sent asynchronously on c (written c![v]) or received from c and bound to some variable x in some guarded process P (written c?x.P). Any process can send and receive on the channels they know. In contrast, A JoCaml process that knows a channel can only send messages on it, whereas a unique channel definition receives all messages. Finally, the scope of a pi-calculus channel name c is defined by the new c in P operator. Such an operator does not exist in JoCaml, since join definitions are binding constructs.

Nonetheless, bi-directional channels can be defined in JoCaml as follows:

#let new_pi_channel () =
   def send(x) & receive() = reply x to receive in
   send, receive
 ;;
val new_pi_channel : unit -> ’a Join.chan * (unit -> ’a) = <fun>

A pi-calculus channel is implemented by a join definition with two port names. The port name send is asynchronous and is used to send messages on the channel. Such messages can be received by making a synchronous call to the other port name receive. Let us now “translate” the pi-calculus process

new c,d in c![1] | c![2] | c?x.d![x+x] | d?y. print(y))

We get:

#spawn begin 
   let sc, rc = new_pi_channel ()
   and sd, rd = new_pi_channel () in
     sc(1) &  sc(2) &  (let x = rc() in sd(x+x)) 
   & (let y = rd() in print_int y ; 0) 
 end
 ;;
- : unit = ()
4

Observe that, by contrast with the JoCaml semantics of receptors, the process that performs x+x runs only once.

Synchronous pi-calculus channels are encoded just as easily as asynchronous ones: it suffices to make send synchronous:

#let new_pi_sync_channel () =
   def send(x) & receive() = reply x to receive & reply to send in
   send, receive
 ;;
val new_pi_sync_channel : unit -> (’a -> unit) * (unit -> ’a) = <fun>

1.4.2  Timeout

We can define a function timeout which tries to compute a function f. If the computation of f is too long, we do not wait for the result. Instead, we acknowledge that a timeout occurred, by returning value None.

#let timeout t f x =
   def wait() & finished(r) = reply Some r to wait
   or  wait() & timeout() = reply None to wait in
   spawn begin
     finished(f x) &
     begin Thread.delay t; timeout() end
   end ;
   wait()
 ;;
val timeout : float -> (’a -> ’b) -> ’a -> ’b option = <fun>

It is to be noticed that, in case a timeout occurs, the computation of f is not interrupted. Namely, there is no way to “kill” a running agent. However if computing f x takes more time than the timeout value, timeout f x will return None. It should perhaps be observed that we here rely upon the underlying (Objective Caml) thread implementation, i.e. we use the function Thread.delay from the Thread library.

As an example of a computation that takes a long time to execute, we select an infinite loop.

#let rec loop () =  Thread.yield () ; loop ()
 ;;
val loop : unit -> ’a = <fun>

Observe that we call Thread.yield (), so as to give other threads a chance to be scheduled. Here again we rely upon the underlying thread scheduler.

Finally, we execute loop () under the control of a timeout of 0.5 second.

#match timeout 0.5 loop () with
 | None -> print_string "Timeout !"
 | Some _ -> print_string "No timeout ! "
 ;;
- : unit = ()
Timeout !

Timeout occurred in the sense that the call to the timeout function returns after about half a second. However, the controlled computation is still alive, consuming resources. For solving this issue, see Section 1.10.1.

1.4.3  Iterations

Join patterns are also useful for expressing various programming control structures. Our first example deal with iterations on an integer interval.

Simple loop

Asynchronous loops can be used when the execution order for the iterated actions is irrelevant.

#def loop(a,x) = if x > 0 then (a() & loop(a,x-1))
 ;;
val loop : (unit Join.chan * int) Join.chan = <abstr>
#def echo_star() = print_string "*" ; 0
 ;;
val echo_star : unit Join.chan = <abstr>
#spawn loop(echo_star,5)
 ;;
- : unit = ()
*****

There is also a for loop at the process level.

#def loop(a,x) = for i=1 to x do a() done
 ;;
val loop : (unit Join.chan * int) Join.chan = <abstr>
#spawn loop(echo_star,5)
 ;;
- : unit = ()
*****

List iterator

In this section we examine the asynchronous counterpart of the list iterator List.iter:

#let iter f xs = List.iter (fun x -> spawn f(x)) xs
 ;;
val iter : ’a Join.chan -> ’a list -> unit = <fun>

Hence iter c  [e1 ; e2 ; ; en ] will act as c(e1) & c(e2) & & c(en).

#let digits = [0;1;2;3;4;5;6;7;8;9]
 ;;
val digits : int list = [0; 1; 2; 3; 4; 5; 6; 7; 8; 9]
#iter echo digits
 ;;
- : unit = ()
2361798450

Of course, iter can be written as a channel and without using List.iter.

#let iter c =
   def do_iter(x::xs) = c(x) & do_iter(xs)
   or  do_iter([]) = 0 in
   (fun xs -> spawn do_iter(xs))
 ;;
val iter : ’a Join.chan -> ’a list -> unit = <fun>
#iter echo digits
 ;;
- : unit = ()
2098765431

For the iteration over a list to produce a result (i.e. to get the asynchronous counterpart of List.fold), one combines the asynchronous iterator and the collector of Section 1.4.1. For instance, here is how one can sum the squares of all the integers in a list.

#let square x = x*x
 ;;
val square : int -> int = <fun>
#let sum xs =
   let add,wait = collect_as_sum (List.length xs) in
   def add_square(x) = add(square x) in
   iter add_square xs ;
   wait()
 ;;
val sum : int list -> int = <fun>
#print_int (sum digits)
 ;;
- : unit = ()
285

Another variation on the same idea is making a list of squares, order being irrelevant.

#let squares xs =
   let add,wait = create_collector (fun x xs -> x::xs) [] (List.length xs) in
   def add_square(x) = add(square(x)) in
   iter add_square xs ;
   wait()
 ;;
val squares : int list -> int list = <fun>
#squares digits
 ;;
- : int list = [64; 36; 25; 9; 81; 49; 1; 0; 16; 4]

As such, the functions sum and squares are not very interesting, however they may act as an introduction to the more involved topic of distributed iteration.

Distributed iterations

Sharing iterations between several “agents” requires more work. Let us informally define an agent as some computing unit. In this section, an agent is represented by a synchronous channel. In a more realistic setting, different agents would reside on different computers.

For instance, here are two agents square1 and square2. The agent square1 models a fast machine, whereas square2 models a slow machine thanks to an artificial delay.

#let square1 i =
   print_string "(" ;
   let r = i*i in
   print_string ")" ;
   r
 ;;
val square1 : int -> int = <fun>
#let square2 i =
   print_string "<" ;
   let r = i*i in
   Thread.delay 0.5 ;
   print_string ">" ;
   r
 ;;
val square2 : int -> int = <fun>

Additionally, square1 and square2 differ marginally by their console output: the fast square1 prints “(” when it starts and “)”’ when it’s done, while square2 uses “<” and “>” for the same purpose.

Sharing a loop between several agents is allocating the iterations to be performed to them. The following channel create_sum, returns a register channel and a wait channel. An agent registers by sending its computing channel on register. The final loop result is returned on wait.

#let create_sum n =
   let add,wait = collect_as_sum n in
   def add_square(x) & register(square) = add(square(x)) & register(square) in
   for i=0 to n-1 do spawn add_square(i) done ;
   register, wait
 ;;
val create_sum : int -> (int -> int) Join.chan * (unit -> int) = <fun>

The key difference with the asynchronous sum loop from the previous section resides in an indirection: instead of sending x squared directly on the channel add, the new add_square channel extracts a squaring function from the channel register. As a consequence, the agents square1 and square2 may now compete for loop iterations, provided two invocations register(square1) and register(square2) are active.

#let register, wait = create_sum 32
 ;;
val register : (int -> int) Join.chan = <abstr>
val wait : unit -> int = <fun>
#spawn register(square2) & register(square1)
 ;;
- : unit = ()
#print_int (wait ());
 ;;
- : unit = ()
()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()10416

The distributed loop above is not satisfactory, since it does not take the relative computing speed of square1 and square2 into account while allocating iterations. The add(square(x)) jobs are spawned asynchronously, so that all iterations are performed concurrently. As a result, the iteration space is partitioned randomly between square1 and square2, as illustrated by the output above, which can be about anything. This leads to a poor load balance, in fact to no load balance at all.

A better solution is for an agent to execute its share of work in sequence rather than concurrently. This is achieved by the slightly modified definition for def add_square(x) & register(square) below:

#let create_sum n =
    let add,wait = collect_as_sum n in

    def add_square(x) & register(square) =
      let r = square x in add(r) & register(square) in

    for i=0 to n-1 do spawn add_square(i) done ;
    register, wait
 ;;
val create_sum : int -> (int -> int) Join.chan * (unit -> int) = <fun>

In the new definition, register(square) is launched again only once square(x) is computed. This is so because of the semantics of let x = E in P, which says that process P executes only once expression E is evaluated.

#let register, wait = create_sum 32
 ;;
val create_sum : int -> (int -> int) Join.chan * (unit -> int) = <fun>
val register : (int -> int) Join.chan = <abstr>
val wait : unit -> int = <fun>
#spawn register(square1) & register(square2)
 ;;
- : unit = ()
#print_int (wait ())
 ;;
- : unit = ()
<()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()>10416

Finally, at the fine tuning level, one may object that messages on the add_square channel are a bit out of control: we send a burst of such message with a for loop, and they probably accumulate in some internal queue, waiting for agents to retrieve them.

A final version of create_sum takes care not to send too many messages on add_square concurrently. Basically, we only have to send a new message as soon as a registred agent has retrieved one message.

#let create_sum n =
    let add,wait = collect_as_sum n in
    def add_square(x) & register(square) =
      (if x > 0 then add_square(x-1)) &
      (let r = square x in register(square) & add(r)) in
    spawn add_square(n-1) ;
    register, wait
 ;;
val create_sum : int -> (int -> int) Join.chan * (unit -> int) = <fun>
#let register, wait = create_sum 32
 ;;
val register : (int -> int) Join.chan = <abstr>
val wait : unit -> int = <fun>
#spawn register(square1) & register(square2)
 ;;
- : unit = ()
#print_int (wait ())
 ;;
- : unit = ()
<()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()()>10416

1.5  Data structures

1.5.1  A concurrent reference cell

Object states, represented as join patterns, can be altered by invoking the appropriate methods. Here is a definition for a reference cell. One method (get) examines the content of the cell, while another (put) alters it.

#let create_ref y0 =
   def state(y) & get() = state(y) & reply y to get
    or state(_) & put(y) = state(y) & reply to put in
   spawn state(y0) ; 
   (get, put)
 ;;
val create_ref : ’a -> (unit -> ’a) * (’a -> unit) = <fun>

Here, the internal state of a cell is its content, its is stored as a message y on the channel state. Lexical scoping is used to keep the state internal to a given cell.

#let gi, pi = create_ref 0
 and gs, ps = create_ref ""
 ;;
val gi : unit -> int = <fun>
val pi : int -> unit = <fun>
val gs : unit -> string = <fun>
val ps : string -> unit = <fun>

1.5.2  A concurrent stack

A stack is a data structure that provide push and pop operations with LIFO (Last In First Out) semantics.

#let new_stack () =
   def state (s) & push (v) = state (v::s) & reply to push
    or state (x::s) & pop () = state (s) & reply x to pop in
   spawn state([]); 
   pop, push
 ;;
val new_stack : unit -> (unit -> ’a) * (’a -> unit) = <fun>

The first join pattern (state(s) & push(v)) is an ordinary one: it is matched whenever there are messages on both state and push. The second join pattern (state (x::s) & pop ()) uses algebraic pattern matching. This pattern is matched only when there are messages on both state and pop and the state content is an non-empty list. As a consequence, an attempt to retrieve an element from an empty stack is not an error: answering to pop is simply postponed until the stack fills in.

#let pop, push = new_stack ()
 ;;
val pop : unit -> ’_a = <fun>
val push : ’_a -> unit = <fun>
#spawn echo(pop())
 ;;
- : unit = ()
#push(1)
 ;;
- : unit = ()
1

1.5.3  Buffers

A buffer is some kind of double-ended queue. Elements are added at the one end with put and retrieved at the other end with get. That is, a buffer preserves elements ordering.

#type ’a buffer = { put : ’a -> unit ; get : unit -> ’a ; }
 ;;
type ’a buffer = { put : ’a -> unit; get : unit -> ’a; }

For us, put and get are synchronous channels, and a get attempt on an empty buffer is blocking. By contrast a put attempt always succeeds: our buffer is unbounded.

Using the trick of encoding a FIFO queue functionnaly as a pair of lists, we write:

#let create_buffer () =
   def alive (xs,y::ys) & get() = alive(xs,ys) & reply y to get
   or alive(_::_ as xs,[]) & get() = alive([], List.rev xs) & reply get() to get
   or alive(xs,ys) & put(x) = alive(x::xs,ys) & reply to put in
   spawn alive([],[]) ;
   {put=put; get=get;}
 ;;
val create_buffer : unit -> ’a buffer = <fun>

We shall assume that the buffer is used by two communicating agents: one writer that performs put and one reader that performs get. The buffer then posseses the property that the reader reads the elements in the same order as the writer wrote them. More precisely, the channel alive(xs,ys) is used in internally to encode the state of the buffer: the ordered list of elements seen from reader’s side (get) being ys@List.rev xs. One may observe that calls on get are blocked when the buffer is empty, i.e. when alive([],[]) is active.

1.6  A serious example: connecting a producer with consumers

To demonstrate that our buffer is useful we consider a simple producer–consumer(s) problem, our producer feeds our consumer with 1,2,3,4,5

#type ’a consumer = { send : ’a -> unit ; }
 ;;
type ’a consumer = { send : ’a -> unit; }
#let producer c = for k = 1 to 5 do c.send k done
 ;;
val producer : int consumer -> unit = <fun>

And here is a simple consumer that prints its input on the console.

#let consumer = {send=print_int}
 ;;
val consumer : int consumer = {send = <fun>}

We connect the producer and the consumer by the means of ordinary function application.

#producer consumer
 ;;
- : unit = ()
12345

We introduced the record type ’a consumer for getting the compiler to issue significant types. However, a consumer is nothing more than a function of type ’a -> unit. Thus, we have just explained function application in a rather contrieved manner.

Now we start complicating things: we want to feed two consumers. Those consumers are “concurrent agents” and we encode them as join definitions.

#let fast_consumer =
   let fast_cons x xs =  Thread.delay 0.1 ; x::xs in
   def put(x) & state(xs) =
     print_string ("("^string_of_int x) ;
     let xs = fast_cons x xs in
     print_string ")" ;
     state(xs) & reply to put in
  spawn state([]) ;
  { send=put }

 and slow_consumer =
   let slow_cons x xs = Thread.delay 0.3 ; x::xs in
   def put(x) & state(xs) =
     print_string ("<"^string_of_int x) ;
     let xs = slow_cons x xs in
     print_string ">" ;
     state(xs) & reply to put in
   spawn state([]) ;
   { send=put; }
 ;;
val fast_consumer : int consumer = {send = <fun>}
val slow_consumer : int consumer = {send = <fun>}

Both consumers accept integers on the synchronous channel put and accumulate integers as a list in some internal channel state. However, the fast consumer builds lists at about three times the speed of the slow consumer. We assume that element order is critical: consumers must compute the reversed list of producer output. Consumers also differ marginally as regards console output, the fast consumer prints recieved value x as (x), while the slow consumer prints <x>.

We do not intend the consumers to compete for producer output. Instead, they should get their own copy. Of course, we wish not change the producer. Hence, we insert the following duplicator between the producer and the consumers.

#let dup c1 c2 =
   let dup_send x = c1.send x ; c2.send x in
   { send=dup_send }
 ;;
val dup : ’a consumer -> ’a consumer -> ’a consumer = <fun>

Clearly, given the type above, the producer should not notice that its messages are duplicated: the combined consumers act as single consumer.

We marginally alter the producer, so as it shows timing information.

#let producer c =
   let t_start = Unix.gettimeofday () in
   for k = 1 to 5 do c.send k done ;
   let t_end = Unix.gettimeofday () in
   print_string (Printf.sprintf "{Time elapsed: %0.2f}" (t_end -. t_start))
 ;;
val producer : int consumer -> unit = <fun>

And then we connect the producer and both consumers.

#producer (dup slow_consumer fast_consumer)
 ;;
- : unit = ()
<1>(1)<2>(2)<3>(3)<4>(4)<5>(5){Time elapsed: 2.00}

Integers are consummed (and printed) in the right order. However there is no concurrency at all: consumers execute sequentially and the producer waits for both consumers.

A first idea to have consumers to execute concurrentlty is to send the integers to them asynchronously. Ideally, we should change the type of consumers making it to be {send : ’a Join.chan}. So as not to rewrite everything, we write a wrapper that simply spawns calls to the send function of consumers.

#let asynchronyze c = { send=(fun x -> spawn (c.send(x); 0)) }
 ;;
val asynchronyze : ’a consumer -> ’a consumer = <fun>
#let async_slow = asynchronyze slow_consumer
 and async_fast = asynchronyze fast_consumer
 ;;
val async_slow : int consumer = {send = <fun>}
val async_fast : int consumer = {send = <fun>}
#producer (dup async_slow async_fast)
 ;;
- : unit = ()
<2(1{Time elapsed: 0.00})(2)(5><5)(4)(3)><3><4><1>

Now consumers execute concurrently and producer speed is unconstrained. From the point of view of the producer, we are connected to a fast consumer. From the point of view of a consumer, we select integers at random, amongst a soup of asynchronous messages. Consumers can execute concurrently, because they are messages for both of them in the soup simultaneously. However, as a result of all messages being sent asynchronously, the ordering on producer output is lost.

To recover that ordering while preserving concurrent execution, we can use buffers.

#let bufferize c =
   let buff = create_buffer () in
   def transmit() = c.send(buff.get()) ; transmit() in
   spawn transmit() ;
   { send=buff.put }
 ;;
val bufferize : ’a consumer -> ’a consumer = <fun>

The bufferize function takes a consumer as its argument and return another consumer. The returned consumer simply puts messages into an internal buffer buff. A concurrent agent transmit extracts the messages from the buffer and forwards them to the initial consumer c.

The bufferizing duplicator inserts a buffer in front of each consumer.

#let dup_buffered c1 c2 = dup (bufferize c1) (bufferize c2)
 ;;
val dup_buffered : ’a consumer -> ’a consumer -> ’a consumer = <fun>
#producer (dup_buffered slow_consumer fast_consumer)
 ;;
- : unit = ()
(1<1{Time elapsed: 0.00})(2)(3>)(4<2)(5)><3><4><5>

From the point of view of the producer, a buffer is a fast consumer, hence the producer still runs at full speed. But now, between the producer and a consumer, there is a buffer that preserves ordering, in place of a soup that mixes everything. Additionally, the consumer is now a concurrent agent that attempt to get a new value from the buffer, as soon as it is done with the previous value. As a result, consumers execute concurrently, and consume producer output in the issuing order.

We may feel satisfied, but we are not done yet: we may now consider that the producer is too fast and that it fills the buffers needlessly. It may be a good idea for the producer to wait for one message to be consumed by someone, before issuing the next message. That way, we avoid filling up buffers. To that aim, we attach a fresh tick channel to each message issued by the producer, and then wait for a message on tick.

#let add_tick c =
   let send_ticked x =
     def wait() & tick() = reply to wait in
     c.send (tick,x) ;
     wait () in
   { send=send_ticked }
 ;;
val add_tick : (unit Join.chan * ’a) consumer -> ’a consumer = <fun>
#let ticked_producer c = producer (add_tick c)
 ;;
val ticked_producer : (unit Join.chan * int) consumer -> unit = <fun>

At the consuming end, we remove the tick, transmit the actual message to the actual consumer, and then, once the messsage is accepted, issue a message on tick.

#let remove_tick c =
   let send_unticked (tick,x) = c.send x ; spawn tick () in
   { send=send_unticked }
 ;;    
val remove_tick : ’a consumer -> (unit Join.chan * ’a) consumer = <fun>
#let ticked_slow = remove_tick slow_consumer
 and ticked_fast = remove_tick fast_consumer
 ;;
val ticked_slow : (unit Join.chan * int) consumer = {send = <fun>}
val ticked_fast : (unit Join.chan * int) consumer = {send = <fun>}

Then, we combine the ticked producer and the ticked consumers as we did for unticked ones.

#ticked_producer (dup_buffered ticked_slow ticked_fast)
 ;;
- : unit = ()
<1(1)(2)(3><2)(4)(5){Time elapsed: 0.50}><3><4><5>

As we can see, producer speed is now more or less the speed of the fast consumer.

1.7  Modules

JoCaml relies on the same module system as Objective Caml. For example, a Semaphore module can be defined with the following interface.

#module type Semaphore =
   sig 
     type t
     val create: int -> t
     val p: t -> unit
     val v: t -> unit
   end
 ;;
module type Semaphore =
  sig type t val create : int -> t val p : t -> unit val v : t -> unit end

The type t of a semaphore is an abstract type. Function create n returns a new semaphore initialized with the value n. p and v are the atomic functions to manipulate semaphores.

The implementation of semaphores can be done as the one of locks (see section 1.4.1).

#module Semaphore: Semaphore =
   struct 
     type t = (unit -> unit) * (unit -> unit)

     let create n =
       def p() & s() = reply to p
       and v() = s() & reply to v in
       for i = 1 to n do spawn s() done;  
       (p,v)

     let p (prolagen,_) = prolagen()
     and v (_,verhogen) = verhogen()

   end
 ;;
module Semaphore : Semaphore

And here an example of the usage of module Semaphore.

#let s = Semaphore.create 2
 ;;
val s : Semaphore.t = <abstr>
#def agent(x) =
   def loop(n,k) = match n with
     | 0 -> k()
     | n -> print_int x ; loop(n-1,k) in
   Semaphore.p s ;
   loop(5,def k() = Semaphore.v s ; 0 in k)
 ;;
val agent : int Join.chan = <abstr>
#spawn agent(1) & agent(2) & agent(3)
 ;;
- : unit = ()
222221111133333

As to semaphore semaphore semantics, in the ouput above there must be an “initial value” (maybe 1) and a “final value” (maybe 3) such all instances of the initial value apear before the instances of the final value.

1.8  A word on typing

The JoCaml type system is derived from the ML type system and it should be no surprise to ML programmers. The key point in typing à la ML is parametric polymorphism. For instance, here is a polymorphic identity function:

#def id(x) = reply x to id
 ;;
val id : ’a -> ’a = <fun>

The type for id contains a type variable “’a” that can be instantiated to any type each time id is actually used. Such a type variable is a generalized type variable. For instance, in the following program, variable “’a” is instantiated successively to int and string:

#let i = id(1) and s = id("coucou")
 ;;
val i : int = 1
val s : string = "coucou"
#print_int i ; print_string s
 ;;
- : unit = ()
1coucou

In other words, the first occurrence of id above has type int -> int, while the second has type string -> string. Experienced ML programmers may wonder how JoCaml type system achieves mixing parametric polymorphism and mutable data structures. There is no miracle here. Consider, again, the JoCaml encoding of a reference cell:

#def state(x) & get() = state(x) & reply x to get
 or  state(_) & set(x) = state(x) & reply to set
 ;;
val get : unit -> ’_a = <fun>
val state : ’_a Join.chan = <abstr>
val set : ’_a -> unit = <fun>

The type variable “’_a” that appears inside the types for state, get and set is prefixed by an underscore “_”. Such type variables are non-generalized type variables that are instantiated only once. That is, all the occurrences of state must have the same type. Moreover, once “’_a” is instantiated with some type, this type replaces “’_a” in all the types where “’_a” appears (here, the types for get and set). This wide-scope instantiation guarantees that the various port names whose type contains “’_a” (state, get and set here) are used consistently.

More specifically, if “’_a” is instantiated to some type int, by sending the message 0 on state. Then, the type for get is unit -> int in the rest of the program, as shown by the type for x below. As a consequence, the following program does not type-check and a runtime type-error (printing an integer, while believing it is a string) is avoided:

#def state(x) & get() = state(x) & reply x to get
 or  state(_) & set(x) = state(x) & reply to set
 ;;
val get : unit -> ’_a = <fun>
val state : ’_a Join.chan = <abstr>
val set : ’_a -> unit = <fun>
#spawn state(0)
 ;;
- : unit = ()
#let x = get ()
 ;;
val x : int = 0
#print_string x
 ;;
Error: This expression has type int but an expression was expected of type
         string

Non generalized type variables appear when the type of several co-defined port names share a type variable. Such a type variable is not generalized.

#def port(p) & arg(x) = p x
 ;;
val port : ’_a Join.chan Join.chan = <abstr>
val arg : ’_a Join.chan = <abstr>

A workaround is to encapsulate the faulty names into a function definition. This restores polymorphism.

#let create_it () = def port(p) & arg(x) = p x in  port,arg
 ;;
val create_it : unit -> ’a Join.chan Join.chan * ’a Join.chan = <fun>

Non-generalized type variables also appear in the types of the identifiers defined by a value binding.

#let (p1,a1),(p2,a2) = create_it (), create_it ()
 ;;
val p1 : ’_a Join.chan Join.chan = <abstr>
val a1 : ’_a Join.chan = <abstr>
val p2 : ’_a Join.chan Join.chan = <abstr>
val a2 : ’_a Join.chan = <abstr>
#spawn p1(echo) & p2(echo_string)
 ;;
- : unit = ()
#(a1,a2)
 ;;
- : int Join.chan * string Join.chan = (<abstr>, <abstr>)
#spawn a1(1) & a2("coucou")
 ;;
- : unit = ()
coucou1

It is interesting to notice that invoking create_it () twice yields two different sets of port and arg port names, whose types contain different type variables — unfortunately all type variables appear as ’_a. Namely, once the variables are instantiated by sending messages on p1 and p2, the types of a1 and a2 are instantianted accordingly. Thereby, programmers make explicit the different type instantiations that are performed silently by the compiler in the case of generalized type variables.

1.9  Exceptions

Since processes are mapped to several threads at run-time, it is important to specify their behaviors in the presence of exceptions.

Exceptions behave as in Objective Caml for Objective Caml expressions. If the exception is not caught in the expression, the behavior will depend on the way the process as been spawned. In the following, processes that must reply to a synchronous channel are called synchronous processes and the others are asynchronous processes.

If the process is asynchronous, the exception is printed on the error output and the asynchronous process terminates. No other process is affected.

#spawn begin
     (failwith "Bye bye"; 0)
   & (for i = 1 to 10 do print_int i done; 0) 
 end
 ;;
- : unit = ()
Thread 23 killed on uncaught exception Failure("Bye bye")
12345678910

To avoid the error message, one can raise the Join.Exit exception. Then, the process that commits suicide does so silently.

#spawn begin
     (raise Join.Exit ; 0)
   & (for i = 1 to 10 do print_int i done; 0) 
 end
 ;;
- : unit = ()
12345678910

An exception raised in a process that includes the reply construct behaves differently: the process waiting for the result will receive the exception, which will be propagated as in an Objective Caml function.

#def die() = failwith "die"; reply to die
 ;;
val die : unit -> unit = <fun>
#try
   die()
 with
   Failure msg -> print_string (Printf.sprintf "dead on ’%s’\n" msg)
 ;;
- : unit = ()
dead on ’die’

Synchronous processes may be in charge of replying to more than one synchronous call at the time when an exception is raised. In other words there can be several reply constructs that are syntactically guarded by a shared expression that raises the exception. In such cases, the exception is duplicated and thrown at all threads, reversing joins into forks.

#def a() & b() = failwith "die"; reply to a & reply to b
 ;;
val a : unit -> unit = <fun>
val b : unit -> unit = <fun>
#spawn begin 
     ( (try a() with Failure _ -> print_string "a failed\n"); 0 )
   & ( (try b() with Failure _ -> print_string "b failed\n"); 0 )
 end
 ;;
- : unit = ()
a failed
b failed

Transmission of exceptions by reply constructs follow relatively straightforward rules. Let us first define a channel protect to control our experiments.

#def protect(name,c) =
   begin try c() ; print_string (name ^ ": success\n")
   with Failure _ ->  print_string (name ^ ": failure\n") end ;
   0
 ;;
val protect : (string * (unit -> ’a)) Join.chan = <abstr>

Then, the rule is as follows: basically, the exception is transmitted if the reply is to be executed after the exception, following standard evaluation rules. For instance, in “E ; P”, expression E evaluates before process P executes, while in “P1 & P2” processes P1 and P2 execute independantly.

#def a() & b() = (failwith "die"; reply to a) & reply to b
 ;;
val a : unit -> unit = <fun>
val b : unit -> unit = <fun>
#spawn protect("a",a) & protect("b",b)
 ;;
- : unit = ()
a: failure
b: success

And here is another test, with three channels.

#def a() & b() & c() =
   reply to c &
   if failwith "die" then reply to a & reply to b
   else reply to b & reply to a
 ;;
val a : unit -> unit = <fun>
val b : unit -> unit = <fun>
val c : unit -> unit = <fun>
#spawn protect("a",a) & protect("b",b) & protect("c",c)
 ;;
- : unit = ()
a: failure
b: failure
c: success

1.10  A complete example: controlling several remote shell executions

1.10.1  Realistic timeouts

In section 1.4.2, we introduced the basics of programming a timeout in JoCaml. A first step in a more realistic timeout, is for the controlled computation to abort when the timeout expires. To that aim, the controlled computation must give a means to kill itself. Here, we define a infinite loop that computes a function step at each iteration and that can be killed between two iterations.

#exception Killed
 ;;
exception Killed
#let create_loop (step) =
   def run() & ok() = ok() & reply step(); run() to run
   or  run() & killed() = reply raise Killed to run
   or  kill() & ok() = killed() in
   let loop () = spawn ok() ; run () in
   loop,kill
 ;;
val create_loop : (unit -> ’a) -> (unit -> ’b) * unit Join.chan = <fun>
#let run,kill = create_loop (fun () -> print_string "*"; Thread.delay 0.01)
 ;;
val run : unit -> ’a = <fun>
val kill : unit Join.chan = <abstr>

The channels ok and killed are used internally, they express the status of the computing agent. At a given time, there is a message pending on either ok or killed. In the first situation, computation can go on; while in the second situation, computation is interrupted, and an exception is raised, as a reply to whoever called run. The computation is controlled from outside by the means of one function loop to compute a result (here to loop), and of one asynchronous channel kill to stop computing.

The new timeout function is a small improvement over the previous one: when the delay has expired we also kill the computation by sending a message on the adequate channel kill, which is passed to timeout for that purpose.

#let timeout t f x kill =
   def wait () & finished (r) = reply Some r to wait
   or wait () & timeout () = kill () & reply None to wait
   in
   spawn begin
     finished (f x) &
     begin Thread.delay t ; timeout() end
   end ;
   wait()
 ;;
val timeout : float -> (’a -> ’b) -> ’a -> unit Join.chan -> ’b option =
  <fun>
#match timeout 0.5 run () kill with
 | None -> print_string "Timeout! "
 | Some _ -> print_string "No timeout! "
 ;;
- : unit = ()
Thread 20 killed on uncaught exception Killed
******************************************Timeout! 

The message Thread X killed on uncaught exception Killed above is issued by the JoCaml system, as an indication that some of the underlying threads terminated on an abnormal condition. Here, the killed thread is in charge of executing the process finished (f x). However, the evaluation of f x does not result in a value to be sent on channel finished — instead, exception Killed is raised, and the thread in charge terminates abnormally, since there is no value to send.

To get rid of the message one may use the Join.Exit exception (see Section 1.9). However, it may be advisable not to use system exception in place of users exception. Thus, one can also replace the simple process finished (f x) by the more complex one:

let r = try Some (f x) with Killed -> None in
match r with Some r -> finished(r) |  None -> 0

1.10.2  Forking an Unix process under timeout control

The Unix command arch echoes a conventional string that describes the architecture of the machine, such as alpha, i686 etc. The Unix command rsh host cmd performs the remote execution of command cmd on host. In particular, rsh copies the standard output of the remote command to its standard output. Hence by issuing the command rsh host arch, we get the architecture of host. By a using a bit of classical Unix programming the following function forks an Unix process that performs command arch on a remote host.

#let rsh_arch host = JoinProc.open_in "/usr/bin/rsh" [| host ; "arch" |]
 ;;
val rsh_arch : string -> int * in_channel = <fun>

The rsh_arch function is in charge of forking the concurrent Unix process that performs the remote arch command. To that end it calls open_in from the standard library module JoinProc. Observe that the function rsh_arch returns a process id and an input (IO) channel. Thanks to the Unix plumbing that JoinProc.open_in performs, the output of the remote arch command can be read on this channel. Moreover, the process id is the information needed to kill an Unix process, as we should do when timeout expires.

Now, to use the timeout function of the previous section, we disguise the remote call to arch into a pair of a “run” function and of a “kill” channel.

#let create_arch host =
   let pid, inchan = rsh_arch host in
   def result(r) & wait() & ok () = reply r to wait
   or kill() & ok() =
      begin try Unix.kill pid Sys.sigkill with _ -> () end ; 0 in
   spawn begin
     ok() &
     let a = try Some (input_line inchan) with End_of_file -> None in
     close_in inchan ;
     match a,snd (Unix.waitpid [] pid) with
     | Some a,Unix.WEXITED 0 -> result(a)
     | _,_ -> 0
   end ;
   wait, kill
 ;;
val create_arch : string -> (unit -> string) * unit Join.chan = <fun>

The function create_arch first performs the remote call by calling rsh_arch. The “run” function is the synchronous wait channel. As shown by its synchronization pattern, it transmits one result (one message on the internal channel result) to its caller. The message on result is sent by another concurrent agent, that reads one line from from the remote arch output, and then check the proper termination of the forked command (by Unix.waitpid). Additionally, a kill channel offers the ability to kill the forked Unix process. Finally, thanks to the internal channel ok, either wait will return a valid value, or the forked process will be destroyed as the result of a timeout.

1.10.3  Collecting results

We intend to execute arch on several remote machines, all those executions being performed concurrently. Hence, we need a way to collect n results produced concurrently. The task of collecting those results is performed by using the collectors of the standard library — see section 1.4.1.

#let collector n =
   Col.create
     (fun x xs -> match x with
     | Some x -> x::xs
     | None -> xs)
     [] n
 ;;
val collector : int -> (’a option, ’a list) Col.t = <fun>

A collector is setup for collecting n values sent on channel add. Values may be worth collecting or not (Some x or None), but n messages must be sent on add, before the list of collected values can be returned by wait.

We now spawn n concurrent tasks, one per host in the list hosts.

#let archs hosts =
   let col = collector (List.length hosts) in
   List.iter
     (fun host ->
       let arch, kill = create_arch host in
       spawn begin match timeout 1.0 arch () kill with
       | Some a -> col.Col.collect(Some (host, a))
       | None ->
 	  prerr_endline ("Timeout for: "^host) ;
 	  col.Col.collect(None)
       end)
   hosts ; 
  col.Col.wait()
 ;;
val archs : string list -> (string * string) list = <fun>

And here we go:

#archs ["saumur"; "beaune" ; "yquem" ; "macao"]
 ;;
Timeout for: macao
Timeout for: beaune
- : (string * string) list = [("yquem", "i686"); ("saumur", "x86_64")]


Previous Up Next