Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/unix-servers.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/unix-servers.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 645 - (view) (download)

1 : blume 464 (*
2 :     * Handling compile-servers under Unix- (and Unix-like) operating systems.
3 :     *
4 :     * This is still rather crude and not very robust.
5 :     *
6 :     * (C) 1999 Lucent Technologies, Bell Laboratories
7 :     *
8 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
9 :     *)
10 :     structure Servers :> SERVERS = struct
11 :    
12 : blume 484 structure P = Posix
13 :    
14 : blume 464 type pathtrans = (string -> string) option
15 : blume 484 datatype server = S of { id: int,
16 :     name: string,
17 : blume 464 proc: Unix.proc,
18 :     pt: pathtrans,
19 : blume 484 pref: int,
20 :     decommissioned: bool ref }
21 : blume 464
22 : blume 484 fun servId (S { id, ... }) = id
23 :     fun decommission (S { decommissioned, ... }) = decommissioned := true
24 :     fun decommissioned (S { decommissioned = d, ... }) = !d
25 :     fun servName (S { name, ... }) = name
26 :     fun servPref (S { pref, ... }) = pref
27 :     fun servPT (S { pt, ... }) = pt
28 :     fun servProc (S { proc, ... }) = proc
29 :     val servIns = #1 o Unix.streamsOf o servProc
30 :     val servOuts = #2 o Unix.streamsOf o servProc
31 :    
32 :     val newId = let
33 :     val r = ref 0
34 :     in
35 :     fn () => let val i = !r in r := i + 1; i end
36 :     end
37 : blume 464 val enabled = ref false
38 :    
39 : blume 486 val idle = ref ([]: server list)
40 :     val someIdle = ref (Concur.pcond ())
41 :    
42 : blume 484 local
43 : blume 485 val all = ref (IntMap.empty: server IntMap.map)
44 : blume 632 fun nservers () = IntMap.numItems (!all)
45 : blume 484 in
46 : blume 632 fun allIdle () = length (!idle) = nservers ()
47 :     fun noServers () = nservers () = 0
48 : blume 485 fun allServers () = IntMap.listItems (!all)
49 : blume 484 fun addServer s = let
50 : blume 632 val ns = nservers ()
51 : blume 484 in
52 : blume 485 all := IntMap.insert (!all, servId s, s)
53 : blume 484 end
54 : blume 632 fun delServer s =
55 :     (all := #1 (IntMap.remove (!all, servId s));
56 :     (* If this was the last server we need to wake up
57 :     * everyone who is currently waiting to grab a server.
58 :     * The "grab"-loop will then gracefully fail and
59 :     * not cause a deadlock. *)
60 :     if noServers () then
61 :     (Say.dsay ["No more servers -> back to sequential mode.\n"];
62 :     Concur.signal (!someIdle))
63 :     else ())
64 : blume 484 end
65 :    
66 : blume 479 (* This really shouldn't be here, but putting it into SrcPath would
67 :     * create a dependency cycle. Some better structuring will fix this. *)
68 :     fun isAbsoluteDescr d =
69 :     (case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false)
70 :     handle _ => false
71 : blume 464
72 : blume 479 fun fname (n, s) =
73 :     case servPT s of
74 :     NONE => n
75 :     | SOME f => if isAbsoluteDescr n then f n else n
76 :    
77 : blume 486 (* protect some code segment from sigPIPE signals... *)
78 : blume 484 fun pprotect work = let
79 :     val pipe = UnixSignals.sigPIPE
80 :     fun disable () = Signals.setHandler (pipe, Signals.IGNORE)
81 :     fun reenable sa = ignore (Signals.setHandler (pipe, sa))
82 :     in
83 :     SafeIO.perform { openIt = disable, closeIt = reenable,
84 :     work = fn _ => work (), cleanup = fn _ => () }
85 :     end
86 :    
87 : blume 486 (* Send a message to a slave. This must be sigPIPE-protected. *)
88 : blume 464 fun send (s, msg) = let
89 : blume 479 val outs = servOuts s
90 : blume 464 in
91 : blume 479 Say.dsay ["-> ", servName s, " : ", msg];
92 : blume 484 pprotect (fn () =>
93 :     (TextIO.output (outs, msg); TextIO.flushOut outs)
94 :     handle _ => ())
95 : blume 464 end
96 :    
97 :     fun show_idle () =
98 :     Say.dsay ("Idle:" ::
99 :     foldr (fn (s, l) => " " :: servName s :: l) ["\n"] (!idle))
100 :    
101 :     (* Mark a server idle; signal all those who are currently waiting for
102 :     * that...*)
103 :     fun mark_idle s =
104 :     (idle := s :: !idle;
105 :     Concur.signal (!someIdle);
106 :     Say.dsay ["Scheduler: slave ", servName s, " has become idle.\n"];
107 :     show_idle ())
108 :    
109 :     (* Grab an idle server; wait if necessary; reinitialize condition
110 :     * if taking the only server. *)
111 :     fun grab () =
112 : blume 486 (* We need to check the following every time (at least the
113 :     * "noServers" part) because it might be that all servers
114 :     * have meanwhile gone away for some reason (crashed, etc.). *)
115 :     if not (!enabled) orelse noServers () then NONE
116 :     else case !idle of
117 : blume 464 [] => (Concur.wait (!someIdle); grab ())
118 :     | [only] =>
119 :     (Say.dsay ["Scheduler: taking last idle slave (",
120 :     servName only, ").\n"];
121 :     idle := [];
122 : blume 478 someIdle := Concur.pcond ();
123 : blume 486 SOME only)
124 : blume 464 | first :: more => let
125 :     fun best (b, [], rest) = (b, rest)
126 :     | best (b, s :: r, rest) = let
127 : blume 479 val bp = servPref b
128 :     val sp = servPref s
129 : blume 464 in
130 :     if sp > bp then best (s, r, b :: rest)
131 :     else best (b, r, s :: rest)
132 :     end
133 :     val (b, rest) = best (first, more, [])
134 :     in
135 :     Say.dsay ["Scheduler: taking idle slave (",
136 :     servName b, ").\n"];
137 :     idle := rest;
138 :     show_idle ();
139 : blume 486 SOME b
140 : blume 464 end
141 :    
142 :     fun wait_status (s, echo) = let
143 : blume 479 val name = servName s
144 :     val ins = servIns s
145 : blume 464
146 :     fun unexpected l = let
147 :     fun word (w, l) = " " :: w :: l
148 :     in
149 :     Say.say ("! Unexpected response from slave " ::
150 :     name :: ":" :: foldr word ["\n"] l)
151 :     end
152 :    
153 : blume 484 fun serverExit () = let
154 :     val what =
155 :     case pprotect (fn () => Unix.reap (servProc s)) of
156 :     (P.Process.W_EXITED | P.Process.W_EXITSTATUS 0w0) =>
157 :     "shut down"
158 :     | _ => "crashed"
159 :     in
160 :     decommission s;
161 : blume 486 Say.say ["[!Slave ", name, " has ", what, ".]\n"];
162 : blume 484 delServer s
163 :     end
164 : blume 464
165 :     val show =
166 :     if echo then (fn report => Say.say (rev report))
167 :     else (fn _ => ())
168 :    
169 :     fun loop report =
170 : blume 484 if decommissioned s then false
171 :     else
172 :     (Concur.wait (Concur.inputReady ins);
173 :     case TextIO.inputLine ins of
174 :     "" => (serverExit (); false)
175 :     | line =>
176 :     (Say.dsay ["<- ", name, ": ", line];
177 :     case String.tokens Char.isSpace line of
178 :     ["SLAVE:", "ok"] =>
179 :     (mark_idle s; show report; true)
180 :     | ["SLAVE:", "error"] =>
181 :     (mark_idle s;
182 :     (* In the case of error we don't show
183 :     * the report because it will be re-enacted
184 :     * locally. *)
185 :     false)
186 :     | "SLAVE:" :: l => (unexpected l; loop report)
187 :     | _ => loop (line :: report)))
188 : blume 464 in
189 :     loop []
190 :     end
191 :    
192 :     (* Send a "ping" to all servers and wait for the "pong" responses.
193 :     * This should work for all servers, busy or no. Busy servers will
194 :     * take longer to respond because they first need to finish what
195 :     * they are doing.
196 :     * We use wait_all after we receive an interrupt signal. The ping-pong
197 :     * protocol does not suffer from the race condition that we would have
198 :     * if we wanted to only wait for "ok"s from currently busy servers.
199 :     * (The race would happen when an interrupt occurs between receiving
200 :     * "ok" and marking the corresponding slave idle). *)
201 :     fun wait_all is_int = let
202 : blume 484 val al = allServers ()
203 : blume 479 fun ping s = let
204 :     val name = servName s
205 :     val ins = servIns s
206 : blume 484 fun loop () =
207 :     if decommissioned s then ()
208 :     else
209 :     (Concur.wait (Concur.inputReady ins);
210 :     case TextIO.inputLine ins of
211 :     "" =>
212 :     (* server has gone away -> no pong *)
213 :     Say.dsay ["<-(EOF) ", name, "\n"]
214 :     | line =>
215 :     (Say.dsay ["<- ", name, ": ", line];
216 :     case String.tokens Char.isSpace line of
217 :     ["SLAVE:", "pong"] => ()
218 :     | _ => loop ()))
219 : blume 464 in
220 :     send (s, "ping\n");
221 :     loop ()
222 :     end
223 : blume 478 val si = Concur.pcond ()
224 : blume 464 in
225 :     if List.null al then ()
226 :     else (Concur.signal si;
227 :     if is_int then
228 :     Say.say
229 :     ["Waiting for attached servers to become idle...\n"]
230 :     else ());
231 :     app ping al;
232 :     idle := al;
233 :     someIdle := si
234 :     end
235 :    
236 : blume 484 fun shutdown (s, method) = let
237 :     val i = servId s
238 :     fun unidle () =
239 :     idle := #2 (List.partition (fn s' => servId s' = i) (!idle))
240 :     fun waitForExit () =
241 :     (unidle ();
242 :     ignore (wait_status (s, false));
243 :     if not (decommissioned s) then
244 :     waitForExit ()
245 :     else ())
246 : blume 464 in
247 : blume 484 method ();
248 :     waitForExit ()
249 :     end
250 : blume 464
251 : blume 484 fun stop s =
252 :     shutdown (s, fn () => send (s, "shutdown\n"))
253 : blume 464
254 : blume 484 fun kill s =
255 :     shutdown (s, fn () => Unix.kill (servProc s, P.Signal.term))
256 : blume 464
257 :     fun start { name, cmd, pathtrans, pref } = let
258 :     val p = Unix.execute cmd
259 : blume 484 val i = newId ()
260 :     val s = S { id = i, name = name,
261 :     proc = p, pt = pathtrans, pref = pref,
262 :     decommissioned = ref false }
263 : blume 464 in
264 : blume 484 if wait_status (s, false) then (addServer s; SOME s)
265 : blume 479 else NONE
266 : blume 464 end
267 : blume 645
268 : blume 464 fun compile p =
269 : blume 486 case grab () of
270 :     NONE => false
271 :     | SOME s => let
272 :     val f = fname (p, s)
273 :     in
274 :     Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];
275 :     send (s, concat ["compile ", f, "\n"]);
276 :     wait_status (s, true)
277 :     end
278 : blume 464
279 :     fun reset is_int = (Concur.reset (); wait_all is_int)
280 :    
281 :     fun startAll st = let
282 :     val l = !idle
283 :     val _ = idle := []
284 :     val tl = map (fn s => Concur.fork (fn () => st s)) l
285 :     in
286 :     SafeIO.perform { openIt = fn () => (),
287 :     closeIt = fn () => (),
288 :     work = fn () => app Concur.wait tl,
289 :     cleanup = reset }
290 :     end
291 :    
292 :     fun cd d = let
293 :     fun st s = let
294 :     val d' = fname (d, s)
295 :     in
296 :     send (s, concat ["cd ", d', "\n"]);
297 :     ignore (wait_status (s, false))
298 :     end
299 :     in
300 :     startAll st
301 :     end
302 :    
303 :     fun cm { archos, project } = let
304 :     fun st s = let
305 :     val f = fname (project, s)
306 :     in
307 :     send (s, concat ["cm ", archos, " ", f, "\n"]);
308 :     ignore (wait_status (s, false))
309 :     end
310 :     in
311 :     startAll st
312 :     end
313 :    
314 :     fun cmb { archos, root } = let
315 :     fun st s =
316 :     (send (s, concat ["cmb ", archos, " ", root, "\n"]);
317 :     ignore (wait_status (s, false)))
318 :     in
319 :     startAll st
320 :     end
321 :    
322 : blume 632 fun cmb_new { archos } = let
323 :     fun st s =
324 :     (send (s, concat ["cmb ", archos, "\n"]);
325 :     ignore (wait_status (s, false)))
326 :     in
327 :     startAll st
328 :     end
329 :    
330 : blume 464 fun dirbase db = let
331 :     fun st s =
332 :     (send (s, concat ["dirbase ", db, "\n"]);
333 :     ignore (wait_status (s, false)))
334 :     in
335 :     startAll st
336 :     end
337 :    
338 :     fun enable () = enabled := true
339 :     fun disable () = enabled := false
340 :    
341 :     fun withServers f =
342 :     SafeIO.perform { openIt = enable,
343 :     closeIt = disable,
344 :     work = f,
345 :     cleanup = reset }
346 : blume 479
347 :     val name = servName
348 : blume 464 end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0