This chapter presents the distributed and mobile features of JoCaml. JoCaml is specifically designed to provide a simple and well-defined model of distributed programming. Since the language entirely relies on asynchronous message-passing, programs can either be used on a single machine (as described in the previous sections), or they can be executed in a distributed manner on several machines.
In this section, we describe support for execution on several machines. To this end, we interleave a description of the model with a series of examples that illustrate the use of these primitives.
The execution of JoCaml programs can be distributed among numerous machines, possibly running different systems; new machines may join or quit the computation. At any time, every process or expression is running on a given machine. In this implementation, the runtime support consists of several system-level processes that communicate using TCP/IP over the network.
In JoCaml, the execution of a process (or an expression) does not usually depend on its localization. Indeed, it is equivalent to run processes P and Q on two different machines, or to run the compound process (P & Q) on a single machine. In particular, the scope for defined names and values does not depend on their localization: whenever a port name appears in a process, it can be used to form messages (using the name as the address, or as the message contents) without knowing whether this port name is locally- or remotely-defined. So far, locality is transparent, and programs can be written independently of their run-time distribution.
Of course, locality matters in some circumstances: side-effects such as printing values on the local terminal depend on the current machine; besides, efficiency can be affected because message-sending over the network takes much longer than local calls; finally, the termination of some underlying runtime will affect all its local processes.
An important issue when passing messages in a distributed system is whether the message contents is replicated or passed by reference. This is the essential difference between functions and synchronous channels. When a function is sent to a remote machine, its code and the values for its local variables are also sent there, and any invocation will be executed locally on the remote machine. When a synchronous port name is sent to a remote machine, only the name is sent and invocations on this name will forward the invocation to the machine were the name is defined, much as in a remote procedure call. In the current implementation of JoCaml, passing a function in a distributed system is not yet implemented.
Since JoCaml has lexical scoping, programs being executed on different machines do not initially share any port name; therefore, they would normally not be able to interact with one another. To bootstrap a distributed computation, it is necessary to exchange a few names, and this is achieved using a built-in library called the name server. Once this is done, these first names can be used to communicate some more names and to build more complex communication patterns. To export names, the JoCaml library provides a name server (Join.Ns).
The interface of the name server mostly consists of two functions to register and look up arbitrary values in a “global table” indexed by plain strings. For instance, the following program contains two processes running in parallel. One of them locally defines some resource (a function f that squares integers) and registers it under the string “square”. The other process is not within the scope of f; it looks up for the value registered under the same string, locally binds it to sqr, then uses it to print something.
#spawn begin
def f (x) = reply x*x to f in
Join.Ns.register Join.Ns.here "square" (f: int -> int);
0
end
;;
- : unit = ()
#spawn begin
let sqr = (Join.Ns.lookup Join.Ns.here "square" : int -> int) in
print_int (sqr 2);
0
end
;;
- : unit = ()
4
lookup and register functions are parameterized by a name server. Here, both processes are executed in the same runtime such that they have a direct access to the local name server (Join.Ns.here).
Communications through the name server are untyped. This weakness involves a good programming discipline.
The runtimes that participate to a distributed computation are launched as independent executables, e.g. bytecode executables generated by the compiler and linked to the distributed runtime. a site (Join.Site) is associated to each runtime.
The following example illustrates a distributed computation with two machines. Let us assume that we have a single machine “here.inria.fr” that is particularly good at computing squares of integers; on this machine, we define a square function that also prints something when it is called (so that we can keep track of what is happening), and we register this function with key “square”:
#def f (x) = print_string ("["^string_of_int(x)^"] "); flush stdout; reply x*x to f in Join.Ns.register Join.Ns.here "square" f ;; - : unit #let wait = def x () & y () = reply to x in x ;; val wait : unit -> unit #let main = Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)); wait() ;; val main : unit
The function Join.Site.listen creates a socket waiting connections to the local site. Here, the server waits connections on the default Internet addresses of the host (Join.Join.get_local_addr() is the first returned by Unix.gethostbyname) on port 12345. The call to the synchronous channel “wait” primitive tells the program to keep running after the completion of all local statements, so that it can serve remote calls.
On machine here.inria.fr, we compile the previous program (p.ml) and we execute it:
here> jocamlc p.ml -o p.out here> ./p.out
We also write a program that relies on the previous machine to compute squares; this program first looks up for the name registered by here.inria.fr, then performs some computations and reports their results.
#let server = let server_addr = Unix.gethostbyname "here.inria.fr" in Join.Site.there (Unix.ADDR_INET(server_addr.Unix.h_addr_list.(0),12345)) ;; val server : Join.Site.t #let ns = Join.Ns.of_site server ;; val ns : Join.Ns.t #let sqr = (Join.Ns.lookup ns "square": int -> int) ;; val sqr : int -> int #let log s x = print_string ("q: "^s^" = "^string_of_int(x)^"\n"); flush stdout ;; val log : string -> int -> unit #let rec sum s n = if n = 0 then s else sum (s+sqr(n)) (n-1) ;; val sum : int -> int -> int #log "sqr 3" (sqr 3); log "sum 5" (sum 0 5) ;; - : unit
This program first connects to here.inria.fr:12345 with the function Join.Site.there to get the abstract value server which represents the JoCaml runtime on here.inria.fr and it gets the name server with the function Join.Ns.of_site (notice that there is the function Join.Ns.of_sockaddr to get directly the name server from the socket address). Then it defines sqr as the square channel of here.inria.fr. The sum function computes the sum of squares using the sqr function.
On another machine there.inria.fr, we compile and run our second program (q.ml):
there> jocamlc q.ml -o q.out there> ./q.out
What is the outcome of this computation? Whenever a process defines new port names, this is done locally, that is, their guarded processes will be executed at the same place as the defining process. Here, every call to square in sqr 3 and within sum 5 will be evaluated as a remote function call to here.inria.fr. The actual localization of processes is revealed by the print_int statements: f (aliased to sqr on there.inria.fr) always prints on machine here, and log always prints on machine there, no matter where the messages are posted.
The result on machine here.inria.fr is:
[3] [5] [4] [3] [2] [1]
while the result on machine there.inria.fr is:
sqr 3= 9
sum 5= 55
When a distributed application runs, some sites can terminate, fail or be unreachable. Hence, sending a message to a site which is not available on a synchronous channel raises the exception Join.Exit.
Let us define a function that test if a site is available or not. On the machine here.inria.fr, we define two synchronous channels: living that always returns true and kill that kills the JoCaml runtime.
#def living () = reply true to living ;; val living : unit -> bool #def kill () & wait () = reply to kill & reply to wait ;; val kill : unit -> unit val wait : unit -> unit #let main = Join.Ns.register Join.Ns.here "living" (living: unit -> bool); Join.Ns.register Join.Ns.here "kill" (kill: unit -> unit); Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)); wait() ;; val main : unit
On the machine there.inria.fr, we get the two synchronous channels defined on here.inria.fr and define a function is_available that calls living. If living returns a value then the JoCaml runtime on here.inria.fr is available. Otherwise, living raises the exception Join.Exit.
#let ns = let server_addr = Unix.gethostbyname "here.inria.fr" in Join.Ns.of_sockaddr (Unix.ADDR_INET(server_addr.Unix.h_addr_list.(0),12345)) ;; val ns : Join.Ns.t #let living = (Join.Ns.lookup ns "living" : unit -> bool) and kill = (Join.Ns.lookup ns "kill" : unit -> unit) ;; val living : unit -> bool val kill : unit -> unit #let is_available () = try living () with Join.Exit -> false ;; val is_available : unit -> bool #let main = if is_available () then print_string "OK ! " else print_string "KO ! "; kill (); if is_available () then print_string "OK ! " else print_string "KO ! " ;; val main : unit
The output of this program on there.inria.fr is:
OK ! KO !
The first time is_available is called, the JoCaml runtime on here.inria.fr is running. Then the call to kill stops the execution of this runtime such that the second call to is_available returns the value false.
Another way to deal with site termination is to use the function Join.Site.at_fail. This function records an asynchronous channel to call when a site fails. Let us define a simple example where two runtimes run on the same computer. The first runtime waits for some connections:
#let wait = def x () & y () = reply to x in x ;; val wait : unit -> unit #let main = Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)); wait() ;; val main : unit
The second runtime gets the site listening on port 12345 and registers the channel echo_failure on the failure of this site.
#let server = Join.Site.there (Unix.ADDR_INET(Unix.inet_addr_loopback, 12345)) ;; val server : Join.Site.t = <abstr> #def echo_failure () = print_string "FAILURE!"; print_newline(); 0 in Join.Site.at_fail server echo_failure ;; - : unit = () #let wait = def x () & y () = reply to x in x ;; val wait : unit -> unit = <fun> #let main = wait() ;; val main : unit
If we execute these two runtimes and kill the first one (with Ctrl-c for example), then the second runtime prints FAILURE!.
We want to build a very simple chat system where each user can broadcast a message to all the others.
The client is the part of the system executed by each user. The behavior of the client is to read messages from the user and broadcast them to the other clients.
#let read broadcast =
try
while true do
let msg = input_line stdin in
broadcast msg
done
with End_of_file -> ()
;;
val read : (string -> ’a) -> unit = <fun>
This function terminates when there are no more characters to read.
To implement the broadcast, we first have to define a type that represents a user and a data structure that allows to store the other users.
#type t_user = { name : string; site : Join.Site.t; write : string -> unit; } ;; type t_user = { name : string; site : Join.Site.t; write : string -> unit; } #let get, add, remove = def state(x) & get() = state(x) & reply x to get or state(x) & add(user) = state(user::x) & reply to add or state(x) & remove(s) = state(List.filter (fun user -> not (Join.Site.equal user.site s)) x) in spawn state([]); get, add, remove ;; val get : unit -> t_user list = <fun> val add : t_user -> unit = <fun> val remove : Join.Site.t Join.chan = <abstr>
Each user is defined by his name, the site where he executes its client and the channel that allows to write on console of his client. To manipulate the set of users, there are the channels get, add and remove. get returns the list of the other users, add puts a new user into the set of users and remove remove the user executed on a given site. Notice that to test the equality of two sites, we have to use the function Join.Site.equal (we cannot use = and == may return a wrong value).
A simple way to broadcast a message is to send the message to each user in a sequential way.
#let seq_broadcast msg =
List.iter (fun user -> try user.write msg with _ -> ()) (get())
;;
val seq_broadcast : string -> unit = <fun>
We can also implement a parallel broadcast using countdown introduced section 1.4.1.
#let create_countdown n = def count(n) & tick() = count(n-1) or count(0) & wait() = reply to wait in spawn count(n) ; tick,wait ;; val create_countdown : int -> unit Join.chan * (unit -> unit) = <fun> #let par_broadcast msg = let others = get() in let tick, wait = create_countdown (List.length others) in List.iter (fun user -> spawn begin begin try user.write msg with _ -> () end; tick() end) others ;; val par_broadcast : string -> unit = <fun>
Here, the broadcast terminates when all the messages are sent. An other solution that does not wait that all the messages are sent and that preserves the order of the messages is to use the buffer introduced section in 1.5.3. So, we define a function that replace the write channel of a user by a write channel with a buffer.
#type ’a buffer = { put : ’a -> unit ; get : unit -> ’a ; } ;; type ’a buffer = { put : ’a -> unit; get : unit -> ’a; } #let create_buffer () = def alive (xs,y::ys) & get() = alive(xs,ys) & reply y to get or alive(_::_ as xs,[]) & get() = alive([], List.rev xs) & reply get() to get or alive(xs,ys) & put(x) = alive(x::xs,ys) & reply to put in spawn alive([],[]) ; {put=put; get=get;} ;; val create_buffer : unit -> ’a buffer = <fun> #let bufferize user = let buff = create_buffer () in def transmit() = user.write(buff.get()) ; transmit() in spawn transmit (); { user with write = buff.put } ;; val bufferize : t_user -> t_user = <fun>
Bufferized users can be used with both broadcast definitions.
We want to remove automatically a user when it is unreachable. So, we use the Join.Site.at_fail function introduced in section 2.1.3 to fired a channel that removes the unreachable user from the set of users.
#let at_fail user =
def remove_user() =
print_endline (user.name ^ " leaves the room ...");
remove user.site
in
Join.Site.at_fail user.site remove_user
;;
val at_fail : t_user -> unit = <fun>
Now, we define how to add a user to the set of users known by the client. This channel add the user to the set of clients and record the behavior to executes when the user is unreachable.
#def new_user(user_there) =
let user_there = bufferize(user_there) in
print_endline (user_there.name ^ " is in the room ...");
add user_there;
at_fail user_there;
0
;;
val new_user : t_user Join.chan = <abstr>
To create a connection, the clients need to exchange their user information (a value of type t_user). This value can be created as follows:
#let name_here = Unix.gethostname() ;; val name_here : string = "chianti" #def write_here(msg) = reply print_endline msg to write_here ;; val write_here : string -> unit = <fun> #let here = { name = name_here; site = Join.Site.here; write = write_here; } ;; val here : t_user = {name = "chianti"; site = <abstr>; write = <fun>}
Here, we define only one write channel on the client. This channel will be shared by all the other users. In this case, we cannot distinguish the user that sends the message on this channel.
An other solution is to create one channel by user and prefix each message by the name of the user:
#let write_from name = def write_here(msg) = reply print_endline (name^"> "^msg) to write_here in write_here ;; val write_from : string -> string -> unit = <fun> #let make_here name_there = { name = name_here; site = Join.Site.here; write = write_from name_there; } ;; val make_here : string -> t_user = <fun>
Let’s see now, how two clients can exchange their user information. Suppose that a client B wants to build a symmetric connection with a client A: A must be added to the set of user of B and B must be added to the set of user of A. The protocol is the following. B sends its name (name_there) and a channel (connect_there) to A through a listen channel on A. Then A uses the connect_there channel to send back to B its user information and A obtains in return the B user information.
So the listen channel (on A) and the connect channel (on B) are defined as follows:
#def listen(name_there, connect_there) = let here = { name = name_here; site = Join.Site.here; write = write_from name_there; } in let user_there = connect_there(here) in new_user(user_there) ;; val listen : (string * (t_user -> t_user)) Join.chan = <abstr> #def connect_here(user_there) = let here = { name = name_here; site = Join.Site.here; write = write_from user_there.name; } in reply here to connect_here & new_user(user_there) ;; val connect_here : t_user -> t_user = <fun>
The listen channel is exported through the name server to allow to create some connections.
#type t_connect = t_user -> t_user ;; type t_connect = t_user -> t_user #type t_listen = (string * t_connect) Join.chan ;; type t_listen = (string * t_connect) Join.chan #let () = Join.Ns.register Join.Ns.here "listen" (listen: t_listen) ;;
So, to build a connection from a site identity we define the channel connect_to as follows.
#def connect_to(site) =
let ns = Join.Ns.of_site site in
let listen_there = (Join.Ns.lookup ns "listen" : t_listen) in
listen_there(name_here, connect_here)
;;
val connect_to : Join.Site.t Join.chan = <abstr>
The last point to build a chat room is to know the sites that participate to the chat. We use a centralize approach where each site register to a server and get in return the list of sites already registered.
Let’s define the server part:
#let register = def state(xs) & get_and_add(x) = state(x::xs) & reply xs to get_and_add & begin Join.Site.at_fail x (def rm() = remove(x) in rm); 0 end or state(xs) & remove(x) = state(List.filter (fun x’ -> not (Join.Site.equal x x’)) xs) in spawn state []; get_and_add ;; val register : Join.Site.t -> Join.Site.t list = <fun> #let server () = Join.Ns.register Join.Ns.here "register" (register: Join.Site.t -> Join.Site.t list); Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)); def wait() & abs() = reply to wait in wait() ;; val server : unit -> unit = <fun>
The register channel defines a shared data structure that store the sites registered. A call to register adds a new site to the data structure and returns the previous state. Sites unreachable are automatically remove from the set of sites thanks to the behavior recorded by Join.Site.at_fail.
The server function exports the register channel, does the call to Join.Site.listen and wait forever.
We define now the client part:
#let client server_site =
let ns = Join.Ns.of_site server_site in
let register = (Join.Ns.lookup ns "register" : Join.Site.t -> Join.Site.t list) in
let others = register Join.Site.here in
List.iter (fun site -> spawn connect_to site) others;
read par_broadcast
;;
val client : Join.Site.t -> unit = <fun>
The client is parameterized by the site of the server. Its behavior is to get the register channel of the server. Then to register and get the list of the other sites. Try to connect to these sites and to executes the read function.
At last, the main function choose the behavior to execute depending on the command line options.
The actual implementation of JoCaml has some limitations. Most of them comes from the messages marshaling.
Code mobility is not yet implemented. It means that a function can not be sent on a distributed channel. We illustrate this point with a JoCaml program server.ml which awaits a function and a value and then does the application.
#def apply (f, x) = reply f x to apply in Join.Ns.register Join.Ns.here "server" (apply: ((int -> int) * int -> int)) ;; - : unit = () #let wait = def x () & y () = reply to x in x ;; val wait : unit -> unit = <fun> #let main = Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)); wait() ;; val main : unit = ()
Then we define the following client program which connects to the server and sends a function:
#let ns = Join.Ns.of_sockaddr (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)) ;; val ns : Join.Ns.t = <abstr> #let main = let compute = (Join.Ns.lookup ns "server" : ((int -> int) * int -> int)) in print_int (compute((fun x -> x + 1), 2)) ;; val main : unit = ()
If we try to run this example, we obtain a runtime error:
here> jocamlc server.ml -o s.out here> ./s.out
there> jocamlc client.ml -o c.out there> ./c.out Fatal error: exception Invalid_argument("output_value: functional value")
The behavior of a channel can change if it is called from the same JoCaml runtime or not. We first define a simple synchronous channel who returns its message.
#def id (x) = reply x to id
;;
val id : ’a -> ’a = <fun>
Then we send a mutable value on this channel from the same runtime:
#let r = ref 21 ;; val r : int ref = {contents = 21} #let r’ = id (r) ;; val r’ : int ref = {contents = 21} #r := !r * 2; print_int !r’ ;; - : unit = () 42
We can observe that r and r’ are the same reference.
Now, we execute the same program except that the id function is computed in an other runtime:
#let ns = Join.Ns.of_sockaddr (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)) ;; val ns : Join.Ns.t = <abstr> #let id = (Join.Ns.lookup ns "id" : (int ref -> int ref)) ;; val id : int ref -> int ref = <fun> #let r = ref 21 ;; val r : int ref = {contents = 21} #let r’ = id (r) ;; val r’ : int ref = {contents = 21} #r := !r * 2; print_int !r’ ;; - : unit = () 21
Here, we can see that r’ is not modified by r := !r * 2 because r and r’ are two different reference cells. When a mutable value go through the network, a new copy is created.
We can notice that we have the same behavior for exceptions.
#let ns = Join.Ns.of_sockaddr (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345)) ;; val ns : Join.Ns.t = <abstr> #let id = (Join.Ns.lookup ns "id" : (exn -> exn)) ;; val id : exn -> exn = <fun> #exception E ;; exception E #try raise (id (E)) with | E -> print_string "E is raised" | _ -> print_string "An exception is raised" ;; - : unit = () An exception is raised
Another, similar, more frequent, situation occurs when the remote site raises an exception as an answer to a synchronous call. When all executions take place in the same runtime, everything is fine.
#def f () = let _x = raise E in reply to f ;; val f : unit -> unit = <fun> #try f () with | E -> print_string "E is raised" | _ -> print_string "An exception is raised" ;; - : unit = () E is raised
But now assume the following code on one runtime A, whose name service is used.
#(* Runtime A *) exception E ;; exception E #def raise_E () = let _x = raise E in reply to raise_E ;; val raise_E : unit -> unit = <fun> #Join.Ns.register Join.Ns.here "raise_E" (raise_E : unit -> unit) ;; - : unit = ()
On runtime B, we also define exception E, retrieve channel raise_E from A, and then send a synchronous message on it.
#(* Runtime B *) exception E ;; exception E #let raise_E = (Join.Ns.lookup ns "raise_E" : unit -> unit) ;; val raise_E : unit -> unit = <fun> #try raise_E() with | E -> print_string "E is raised" | _ -> print_string "An exception is raised" ;; - : unit = () An exception is raised
This behavior can be sumarized by saying that exceptions are generative: the execution of exception E yields a unique exception and executing exception E twice (moreover on two different runtimes) yields two different exceptions, which happen to have the same name E. Maybe we can live with such a notion. Unfortunately, this is not the whole story, when they travel from one runtime to another exeception are copied, which contradicts generativity. Consider a similar example, where another synchronous channel is defined on A as follows:
#(* Runtime A *) def raise_e(e) = let _x = raise e in reply to raise_e ;; val raise_e : exn -> unit = <fun> #Join.Ns.register Join.Ns.here "raise_e" (raise_e : exn -> unit) ;; - : unit = ()
While, on B we define the exception E, retrieve raise_e from the name service of A and send E on channel raise_e.
#(* On runtime B *) exception E ;; exception E #let raise_e = (Join.Ns.lookup ns "raise_e" : exn -> unit) ;; val raise_e : exn -> unit = <fun>
#try raise_e(E) with
| E -> print_string "E is raised"
| _ -> print_string "An exception is raised"
;;
- : unit = ()
An exception is raised
And here, although the remote site A apparently raises an exception defined on site B, it in fact raises a copy of it. This copy is then copied once more while transmitted back to site B. Since exception matching is performed by using pointer equality, it makes the difference beetween those two copies.
A specific mecanism somehow solves the problem of exceptions raised by remote runtimes. One may apply the mecanism to any pre-existing exception by the declaration def exception, for instance on runtime B.
#(* Runtime B *)def exception E
;;
Then, we can try our two examples again, still on runtime B.
#try raise_E() with | E -> print_string "E is raised" | _ -> print_string "An exception is raised" ;; - : unit = () E is raised #try raise_e(E) with | E -> print_string "E is raised" | _ -> print_string "An exception is raised" ;; - : unit = () E is raised
And now, both the copy of the other E defined on site A (raise_E) and the copy of copy of E, match E in the exception handlers.
There is no miracle here, the JoCaml runtime system of B intercepts exceptions raised1 by A and makes all exceptions whose name are E become an unique exception. As a result there is only one exception E on runtime B. Observe that the def exception construct is a non-trivial semantical change over Objective Caml: it more or less introduces matching of exception by structure, but only for exceptions raised from one runtime to another. Be cautious. Notice that applying def construct twice to the same exception yields a fatal error.
By default, JoCaml runtimes share all built-in exception (such as Not_found, Invalid_argument, etc.) and the Join.Exit exception.
Communications through the name service are not typed. In the following example we define a synchronous channel of type int -> int and register it on the name service.
#def f (x) = reply x+1 to f ;; val f : int -> int = <fun> #Join.Ns.register Join.Ns.here "incr" (f: (int -> int)) ;; - : unit = ()
Then, we retrieve the channel and use it with type float -> float.
#let g = (Join.Ns.lookup Join.Ns.here "incr" : (float -> float)) ;; val g : float -> float = <fun> #print_float (g 0.5);; - : unit = () 9.35959773756e-232
We obtain an indeterministic value and we do not have type error! Notice that in most situations JoCaml will crash. A good programming discipline is to define shared types in a separate file and to annotate the functions Join.Ns.register and Join.Ns.lookup with these types.