Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/unix-servers.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/unix-servers.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 805 - (view) (download)

1 : blume 464 (*
2 :     * Handling compile-servers under Unix- (and Unix-like) operating systems.
3 :     *
4 :     * This is still rather crude and not very robust.
5 :     *
6 :     * (C) 1999 Lucent Technologies, Bell Laboratories
7 :     *
8 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
9 :     *)
10 :    
11 : blume 735 (* It is unfortunate but necessary to use a transparant match here.
12 :     * Otherwise the "hack" in $smlnj/cm/full.cm won't work. *)
13 :     structure Servers : SERVERS = struct
14 :    
15 : blume 484 structure P = Posix
16 :    
17 : blume 464 type pathtrans = (string -> string) option
18 : blume 484 datatype server = S of { id: int,
19 :     name: string,
20 : blume 464 proc: Unix.proc,
21 :     pt: pathtrans,
22 : blume 484 pref: int,
23 :     decommissioned: bool ref }
24 : blume 464
25 : blume 735 type server_handle = int * string (* id and name *)
26 :    
27 :     fun handleOf (S { id, name, ... }) = (id, name)
28 : blume 484 fun servId (S { id, ... }) = id
29 :     fun decommission (S { decommissioned, ... }) = decommissioned := true
30 :     fun decommissioned (S { decommissioned = d, ... }) = !d
31 :     fun servName (S { name, ... }) = name
32 :     fun servPref (S { pref, ... }) = pref
33 :     fun servPT (S { pt, ... }) = pt
34 :     fun servProc (S { proc, ... }) = proc
35 :     val servIns = #1 o Unix.streamsOf o servProc
36 :     val servOuts = #2 o Unix.streamsOf o servProc
37 :    
38 :     val newId = let
39 :     val r = ref 0
40 :     in
41 :     fn () => let val i = !r in r := i + 1; i end
42 :     end
43 : blume 464 val enabled = ref false
44 :    
45 : blume 486 val idle = ref ([]: server list)
46 :     val someIdle = ref (Concur.pcond ())
47 :    
48 : blume 484 local
49 : blume 485 val all = ref (IntMap.empty: server IntMap.map)
50 : blume 632 fun nservers () = IntMap.numItems (!all)
51 : blume 484 in
52 : blume 735 fun serverOf (h: server_handle) = IntMap.find (!all, #1 h)
53 : blume 632 fun allIdle () = length (!idle) = nservers ()
54 :     fun noServers () = nservers () = 0
55 : blume 485 fun allServers () = IntMap.listItems (!all)
56 : blume 484 fun addServer s = let
57 : blume 632 val ns = nservers ()
58 : blume 484 in
59 : blume 485 all := IntMap.insert (!all, servId s, s)
60 : blume 484 end
61 : blume 632 fun delServer s =
62 :     (all := #1 (IntMap.remove (!all, servId s));
63 : blume 735 (* If this was the last server, then we need to wake up
64 : blume 632 * everyone who is currently waiting to grab a server.
65 :     * The "grab"-loop will then gracefully fail and
66 :     * not cause a deadlock. *)
67 :     if noServers () then
68 :     (Say.dsay ["No more servers -> back to sequential mode.\n"];
69 :     Concur.signal (!someIdle))
70 :     else ())
71 : blume 484 end
72 :    
73 : blume 735 (* translate absolute pathname encoding; relative and anchored paths
74 :     * stay unchanged *)
75 : blume 479 fun fname (n, s) =
76 :     case servPT s of
77 :     NONE => n
78 : blume 735 | SOME f => if SrcPath.encodingIsAbsolute n then f n else n
79 : blume 479
80 : blume 486 (* protect some code segment from sigPIPE signals... *)
81 : blume 484 fun pprotect work = let
82 :     val pipe = UnixSignals.sigPIPE
83 :     fun disable () = Signals.setHandler (pipe, Signals.IGNORE)
84 :     fun reenable sa = ignore (Signals.setHandler (pipe, sa))
85 :     in
86 :     SafeIO.perform { openIt = disable, closeIt = reenable,
87 :     work = fn _ => work (), cleanup = fn _ => () }
88 :     end
89 :    
90 : blume 486 (* Send a message to a slave. This must be sigPIPE-protected. *)
91 : blume 464 fun send (s, msg) = let
92 : blume 479 val outs = servOuts s
93 : blume 464 in
94 : blume 479 Say.dsay ["-> ", servName s, " : ", msg];
95 : blume 484 pprotect (fn () =>
96 :     (TextIO.output (outs, msg); TextIO.flushOut outs)
97 :     handle _ => ())
98 : blume 464 end
99 :    
100 :     fun show_idle () =
101 :     Say.dsay ("Idle:" ::
102 :     foldr (fn (s, l) => " " :: servName s :: l) ["\n"] (!idle))
103 :    
104 :     (* Mark a server idle; signal all those who are currently waiting for
105 :     * that...*)
106 :     fun mark_idle s =
107 :     (idle := s :: !idle;
108 :     Concur.signal (!someIdle);
109 :     Say.dsay ["Scheduler: slave ", servName s, " has become idle.\n"];
110 :     show_idle ())
111 :    
112 :     (* Grab an idle server; wait if necessary; reinitialize condition
113 :     * if taking the only server. *)
114 :     fun grab () =
115 : blume 486 (* We need to check the following every time (at least the
116 :     * "noServers" part) because it might be that all servers
117 :     * have meanwhile gone away for some reason (crashed, etc.). *)
118 :     if not (!enabled) orelse noServers () then NONE
119 :     else case !idle of
120 : blume 464 [] => (Concur.wait (!someIdle); grab ())
121 :     | [only] =>
122 :     (Say.dsay ["Scheduler: taking last idle slave (",
123 :     servName only, ").\n"];
124 :     idle := [];
125 : blume 478 someIdle := Concur.pcond ();
126 : blume 486 SOME only)
127 : blume 464 | first :: more => let
128 :     fun best (b, [], rest) = (b, rest)
129 :     | best (b, s :: r, rest) = let
130 : blume 479 val bp = servPref b
131 :     val sp = servPref s
132 : blume 464 in
133 :     if sp > bp then best (s, r, b :: rest)
134 :     else best (b, r, s :: rest)
135 :     end
136 :     val (b, rest) = best (first, more, [])
137 :     in
138 :     Say.dsay ["Scheduler: taking idle slave (",
139 :     servName b, ").\n"];
140 :     idle := rest;
141 :     show_idle ();
142 : blume 486 SOME b
143 : blume 464 end
144 :    
145 :     fun wait_status (s, echo) = let
146 : blume 479 val name = servName s
147 :     val ins = servIns s
148 : blume 464
149 :     fun unexpected l = let
150 :     fun word (w, l) = " " :: w :: l
151 :     in
152 :     Say.say ("! Unexpected response from slave " ::
153 :     name :: ":" :: foldr word ["\n"] l)
154 :     end
155 :    
156 : blume 484 fun serverExit () = let
157 :     val what =
158 :     case pprotect (fn () => Unix.reap (servProc s)) of
159 :     (P.Process.W_EXITED | P.Process.W_EXITSTATUS 0w0) =>
160 :     "shut down"
161 :     | _ => "crashed"
162 :     in
163 :     decommission s;
164 : blume 486 Say.say ["[!Slave ", name, " has ", what, ".]\n"];
165 : blume 484 delServer s
166 :     end
167 : blume 464
168 :     val show =
169 :     if echo then (fn report => Say.say (rev report))
170 :     else (fn _ => ())
171 :    
172 :     fun loop report =
173 : blume 484 if decommissioned s then false
174 :     else
175 :     (Concur.wait (Concur.inputReady ins);
176 :     case TextIO.inputLine ins of
177 :     "" => (serverExit (); false)
178 :     | line =>
179 :     (Say.dsay ["<- ", name, ": ", line];
180 :     case String.tokens Char.isSpace line of
181 :     ["SLAVE:", "ok"] =>
182 :     (mark_idle s; show report; true)
183 :     | ["SLAVE:", "error"] =>
184 :     (mark_idle s;
185 :     (* In the case of error we don't show
186 :     * the report because it will be re-enacted
187 :     * locally. *)
188 :     false)
189 :     | "SLAVE:" :: l => (unexpected l; loop report)
190 :     | _ => loop (line :: report)))
191 : blume 464 in
192 :     loop []
193 :     end
194 :    
195 :     (* Send a "ping" to all servers and wait for the "pong" responses.
196 :     * This should work for all servers, busy or no. Busy servers will
197 :     * take longer to respond because they first need to finish what
198 :     * they are doing.
199 :     * We use wait_all after we receive an interrupt signal. The ping-pong
200 :     * protocol does not suffer from the race condition that we would have
201 :     * if we wanted to only wait for "ok"s from currently busy servers.
202 :     * (The race would happen when an interrupt occurs between receiving
203 :     * "ok" and marking the corresponding slave idle). *)
204 :     fun wait_all is_int = let
205 : blume 484 val al = allServers ()
206 : blume 479 fun ping s = let
207 :     val name = servName s
208 :     val ins = servIns s
209 : blume 484 fun loop () =
210 :     if decommissioned s then ()
211 :     else
212 :     (Concur.wait (Concur.inputReady ins);
213 :     case TextIO.inputLine ins of
214 :     "" =>
215 :     (* server has gone away -> no pong *)
216 :     Say.dsay ["<-(EOF) ", name, "\n"]
217 :     | line =>
218 :     (Say.dsay ["<- ", name, ": ", line];
219 :     case String.tokens Char.isSpace line of
220 :     ["SLAVE:", "pong"] => ()
221 :     | _ => loop ()))
222 : blume 464 in
223 :     send (s, "ping\n");
224 :     loop ()
225 :     end
226 : blume 478 val si = Concur.pcond ()
227 : blume 464 in
228 :     if List.null al then ()
229 :     else (Concur.signal si;
230 :     if is_int then
231 :     Say.say
232 :     ["Waiting for attached servers to become idle...\n"]
233 :     else ());
234 :     app ping al;
235 :     idle := al;
236 :     someIdle := si
237 :     end
238 :    
239 : blume 484 fun shutdown (s, method) = let
240 :     val i = servId s
241 :     fun unidle () =
242 :     idle := #2 (List.partition (fn s' => servId s' = i) (!idle))
243 :     fun waitForExit () =
244 :     (unidle ();
245 :     ignore (wait_status (s, false));
246 :     if not (decommissioned s) then
247 :     waitForExit ()
248 :     else ())
249 : blume 464 in
250 : blume 484 method ();
251 :     waitForExit ()
252 :     end
253 : blume 464
254 : blume 484 fun stop s =
255 :     shutdown (s, fn () => send (s, "shutdown\n"))
256 : blume 464
257 : blume 484 fun kill s =
258 :     shutdown (s, fn () => Unix.kill (servProc s, P.Signal.term))
259 : blume 464
260 :     fun start { name, cmd, pathtrans, pref } = let
261 :     val p = Unix.execute cmd
262 : blume 484 val i = newId ()
263 :     val s = S { id = i, name = name,
264 :     proc = p, pt = pathtrans, pref = pref,
265 :     decommissioned = ref false }
266 : blume 464 in
267 : blume 484 if wait_status (s, false) then (addServer s; SOME s)
268 : blume 479 else NONE
269 : blume 464 end
270 : blume 645
271 : blume 464 fun compile p =
272 : blume 486 case grab () of
273 :     NONE => false
274 :     | SOME s => let
275 :     val f = fname (p, s)
276 :     in
277 :     Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];
278 :     send (s, concat ["compile ", f, "\n"]);
279 :     wait_status (s, true)
280 :     end
281 : blume 464
282 :     fun reset is_int = (Concur.reset (); wait_all is_int)
283 :    
284 :     fun startAll st = let
285 :     val l = !idle
286 :     val _ = idle := []
287 :     val tl = map (fn s => Concur.fork (fn () => st s)) l
288 :     in
289 :     SafeIO.perform { openIt = fn () => (),
290 :     closeIt = fn () => (),
291 :     work = fn () => app Concur.wait tl,
292 :     cleanup = reset }
293 :     end
294 :    
295 :     fun cd d = let
296 :     fun st s = let
297 :     val d' = fname (d, s)
298 :     in
299 :     send (s, concat ["cd ", d', "\n"]);
300 :     ignore (wait_status (s, false))
301 :     end
302 :     in
303 :     startAll st
304 :     end
305 :    
306 :     fun cm { archos, project } = let
307 :     fun st s = let
308 :     val f = fname (project, s)
309 :     in
310 :     send (s, concat ["cm ", archos, " ", f, "\n"]);
311 :     ignore (wait_status (s, false))
312 :     end
313 :     in
314 :     startAll st
315 :     end
316 :    
317 : blume 805 fun cmb { dirbase, archos, root } = let
318 : blume 464 fun st s =
319 : blume 805 (send (s, concat ["cmb ", dirbase, " ", archos, " ", root, "\n"]);
320 : blume 464 ignore (wait_status (s, false)))
321 :     in
322 :     startAll st
323 :     end
324 :    
325 : blume 805 fun cmb_reset { archos } = let
326 : blume 632 fun st s =
327 : blume 805 (send (s, concat ["cmb_reset ", archos, "\n"]);
328 : blume 632 ignore (wait_status (s, false)))
329 :     in
330 :     startAll st
331 :     end
332 :    
333 : blume 464 fun enable () = enabled := true
334 :     fun disable () = enabled := false
335 :    
336 :     fun withServers f =
337 :     SafeIO.perform { openIt = enable,
338 :     closeIt = disable,
339 :     work = f,
340 :     cleanup = reset }
341 : blume 479
342 : blume 735 fun name ((i, n) : server_handle) = n
343 :    
344 :     fun handleFun f h =
345 :     case serverOf h of
346 :     NONE => ()
347 :     | SOME s => f s
348 :    
349 :     val stop = handleFun stop
350 :     val kill = handleFun kill
351 :     val start = Option.map handleOf o start
352 :    
353 :     val _ = SrcPath.addClientToBeNotified cd
354 : blume 464 end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0