SCM Repository
Annotation of /sml/trunk/src/cm/concur/unix-servers.sml
Parent Directory
|
Revision Log
Revision 645 - (view) (download)
1 : | blume | 464 | (* |
2 : | * Handling compile-servers under Unix- (and Unix-like) operating systems. | ||
3 : | * | ||
4 : | * This is still rather crude and not very robust. | ||
5 : | * | ||
6 : | * (C) 1999 Lucent Technologies, Bell Laboratories | ||
7 : | * | ||
8 : | * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp) | ||
9 : | *) | ||
10 : | structure Servers :> SERVERS = struct | ||
11 : | |||
12 : | blume | 484 | structure P = Posix |
13 : | |||
14 : | blume | 464 | type pathtrans = (string -> string) option |
15 : | blume | 484 | datatype server = S of { id: int, |
16 : | name: string, | ||
17 : | blume | 464 | proc: Unix.proc, |
18 : | pt: pathtrans, | ||
19 : | blume | 484 | pref: int, |
20 : | decommissioned: bool ref } | ||
21 : | blume | 464 | |
22 : | blume | 484 | fun servId (S { id, ... }) = id |
23 : | fun decommission (S { decommissioned, ... }) = decommissioned := true | ||
24 : | fun decommissioned (S { decommissioned = d, ... }) = !d | ||
25 : | fun servName (S { name, ... }) = name | ||
26 : | fun servPref (S { pref, ... }) = pref | ||
27 : | fun servPT (S { pt, ... }) = pt | ||
28 : | fun servProc (S { proc, ... }) = proc | ||
29 : | val servIns = #1 o Unix.streamsOf o servProc | ||
30 : | val servOuts = #2 o Unix.streamsOf o servProc | ||
31 : | |||
32 : | val newId = let | ||
33 : | val r = ref 0 | ||
34 : | in | ||
35 : | fn () => let val i = !r in r := i + 1; i end | ||
36 : | end | ||
37 : | blume | 464 | val enabled = ref false |
38 : | |||
39 : | blume | 486 | val idle = ref ([]: server list) |
40 : | val someIdle = ref (Concur.pcond ()) | ||
41 : | |||
42 : | blume | 484 | local |
43 : | blume | 485 | val all = ref (IntMap.empty: server IntMap.map) |
44 : | blume | 632 | fun nservers () = IntMap.numItems (!all) |
45 : | blume | 484 | in |
46 : | blume | 632 | fun allIdle () = length (!idle) = nservers () |
47 : | fun noServers () = nservers () = 0 | ||
48 : | blume | 485 | fun allServers () = IntMap.listItems (!all) |
49 : | blume | 484 | fun addServer s = let |
50 : | blume | 632 | val ns = nservers () |
51 : | blume | 484 | in |
52 : | blume | 485 | all := IntMap.insert (!all, servId s, s) |
53 : | blume | 484 | end |
54 : | blume | 632 | fun delServer s = |
55 : | (all := #1 (IntMap.remove (!all, servId s)); | ||
56 : | (* If this was the last server we need to wake up | ||
57 : | * everyone who is currently waiting to grab a server. | ||
58 : | * The "grab"-loop will then gracefully fail and | ||
59 : | * not cause a deadlock. *) | ||
60 : | if noServers () then | ||
61 : | (Say.dsay ["No more servers -> back to sequential mode.\n"]; | ||
62 : | Concur.signal (!someIdle)) | ||
63 : | else ()) | ||
64 : | blume | 484 | end |
65 : | |||
66 : | blume | 479 | (* This really shouldn't be here, but putting it into SrcPath would |
67 : | * create a dependency cycle. Some better structuring will fix this. *) | ||
68 : | fun isAbsoluteDescr d = | ||
69 : | (case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false) | ||
70 : | handle _ => false | ||
71 : | blume | 464 | |
72 : | blume | 479 | fun fname (n, s) = |
73 : | case servPT s of | ||
74 : | NONE => n | ||
75 : | | SOME f => if isAbsoluteDescr n then f n else n | ||
76 : | |||
77 : | blume | 486 | (* protect some code segment from sigPIPE signals... *) |
78 : | blume | 484 | fun pprotect work = let |
79 : | val pipe = UnixSignals.sigPIPE | ||
80 : | fun disable () = Signals.setHandler (pipe, Signals.IGNORE) | ||
81 : | fun reenable sa = ignore (Signals.setHandler (pipe, sa)) | ||
82 : | in | ||
83 : | SafeIO.perform { openIt = disable, closeIt = reenable, | ||
84 : | work = fn _ => work (), cleanup = fn _ => () } | ||
85 : | end | ||
86 : | |||
87 : | blume | 486 | (* Send a message to a slave. This must be sigPIPE-protected. *) |
88 : | blume | 464 | fun send (s, msg) = let |
89 : | blume | 479 | val outs = servOuts s |
90 : | blume | 464 | in |
91 : | blume | 479 | Say.dsay ["-> ", servName s, " : ", msg]; |
92 : | blume | 484 | pprotect (fn () => |
93 : | (TextIO.output (outs, msg); TextIO.flushOut outs) | ||
94 : | handle _ => ()) | ||
95 : | blume | 464 | end |
96 : | |||
97 : | fun show_idle () = | ||
98 : | Say.dsay ("Idle:" :: | ||
99 : | foldr (fn (s, l) => " " :: servName s :: l) ["\n"] (!idle)) | ||
100 : | |||
101 : | (* Mark a server idle; signal all those who are currently waiting for | ||
102 : | * that...*) | ||
103 : | fun mark_idle s = | ||
104 : | (idle := s :: !idle; | ||
105 : | Concur.signal (!someIdle); | ||
106 : | Say.dsay ["Scheduler: slave ", servName s, " has become idle.\n"]; | ||
107 : | show_idle ()) | ||
108 : | |||
109 : | (* Grab an idle server; wait if necessary; reinitialize condition | ||
110 : | * if taking the only server. *) | ||
111 : | fun grab () = | ||
112 : | blume | 486 | (* We need to check the following every time (at least the |
113 : | * "noServers" part) because it might be that all servers | ||
114 : | * have meanwhile gone away for some reason (crashed, etc.). *) | ||
115 : | if not (!enabled) orelse noServers () then NONE | ||
116 : | else case !idle of | ||
117 : | blume | 464 | [] => (Concur.wait (!someIdle); grab ()) |
118 : | | [only] => | ||
119 : | (Say.dsay ["Scheduler: taking last idle slave (", | ||
120 : | servName only, ").\n"]; | ||
121 : | idle := []; | ||
122 : | blume | 478 | someIdle := Concur.pcond (); |
123 : | blume | 486 | SOME only) |
124 : | blume | 464 | | first :: more => let |
125 : | fun best (b, [], rest) = (b, rest) | ||
126 : | | best (b, s :: r, rest) = let | ||
127 : | blume | 479 | val bp = servPref b |
128 : | val sp = servPref s | ||
129 : | blume | 464 | in |
130 : | if sp > bp then best (s, r, b :: rest) | ||
131 : | else best (b, r, s :: rest) | ||
132 : | end | ||
133 : | val (b, rest) = best (first, more, []) | ||
134 : | in | ||
135 : | Say.dsay ["Scheduler: taking idle slave (", | ||
136 : | servName b, ").\n"]; | ||
137 : | idle := rest; | ||
138 : | show_idle (); | ||
139 : | blume | 486 | SOME b |
140 : | blume | 464 | end |
141 : | |||
142 : | fun wait_status (s, echo) = let | ||
143 : | blume | 479 | val name = servName s |
144 : | val ins = servIns s | ||
145 : | blume | 464 | |
146 : | fun unexpected l = let | ||
147 : | fun word (w, l) = " " :: w :: l | ||
148 : | in | ||
149 : | Say.say ("! Unexpected response from slave " :: | ||
150 : | name :: ":" :: foldr word ["\n"] l) | ||
151 : | end | ||
152 : | |||
153 : | blume | 484 | fun serverExit () = let |
154 : | val what = | ||
155 : | case pprotect (fn () => Unix.reap (servProc s)) of | ||
156 : | (P.Process.W_EXITED | P.Process.W_EXITSTATUS 0w0) => | ||
157 : | "shut down" | ||
158 : | | _ => "crashed" | ||
159 : | in | ||
160 : | decommission s; | ||
161 : | blume | 486 | Say.say ["[!Slave ", name, " has ", what, ".]\n"]; |
162 : | blume | 484 | delServer s |
163 : | end | ||
164 : | blume | 464 | |
165 : | val show = | ||
166 : | if echo then (fn report => Say.say (rev report)) | ||
167 : | else (fn _ => ()) | ||
168 : | |||
169 : | fun loop report = | ||
170 : | blume | 484 | if decommissioned s then false |
171 : | else | ||
172 : | (Concur.wait (Concur.inputReady ins); | ||
173 : | case TextIO.inputLine ins of | ||
174 : | "" => (serverExit (); false) | ||
175 : | | line => | ||
176 : | (Say.dsay ["<- ", name, ": ", line]; | ||
177 : | case String.tokens Char.isSpace line of | ||
178 : | ["SLAVE:", "ok"] => | ||
179 : | (mark_idle s; show report; true) | ||
180 : | | ["SLAVE:", "error"] => | ||
181 : | (mark_idle s; | ||
182 : | (* In the case of error we don't show | ||
183 : | * the report because it will be re-enacted | ||
184 : | * locally. *) | ||
185 : | false) | ||
186 : | | "SLAVE:" :: l => (unexpected l; loop report) | ||
187 : | | _ => loop (line :: report))) | ||
188 : | blume | 464 | in |
189 : | loop [] | ||
190 : | end | ||
191 : | |||
192 : | (* Send a "ping" to all servers and wait for the "pong" responses. | ||
193 : | * This should work for all servers, busy or no. Busy servers will | ||
194 : | * take longer to respond because they first need to finish what | ||
195 : | * they are doing. | ||
196 : | * We use wait_all after we receive an interrupt signal. The ping-pong | ||
197 : | * protocol does not suffer from the race condition that we would have | ||
198 : | * if we wanted to only wait for "ok"s from currently busy servers. | ||
199 : | * (The race would happen when an interrupt occurs between receiving | ||
200 : | * "ok" and marking the corresponding slave idle). *) | ||
201 : | fun wait_all is_int = let | ||
202 : | blume | 484 | val al = allServers () |
203 : | blume | 479 | fun ping s = let |
204 : | val name = servName s | ||
205 : | val ins = servIns s | ||
206 : | blume | 484 | fun loop () = |
207 : | if decommissioned s then () | ||
208 : | else | ||
209 : | (Concur.wait (Concur.inputReady ins); | ||
210 : | case TextIO.inputLine ins of | ||
211 : | "" => | ||
212 : | (* server has gone away -> no pong *) | ||
213 : | Say.dsay ["<-(EOF) ", name, "\n"] | ||
214 : | | line => | ||
215 : | (Say.dsay ["<- ", name, ": ", line]; | ||
216 : | case String.tokens Char.isSpace line of | ||
217 : | ["SLAVE:", "pong"] => () | ||
218 : | | _ => loop ())) | ||
219 : | blume | 464 | in |
220 : | send (s, "ping\n"); | ||
221 : | loop () | ||
222 : | end | ||
223 : | blume | 478 | val si = Concur.pcond () |
224 : | blume | 464 | in |
225 : | if List.null al then () | ||
226 : | else (Concur.signal si; | ||
227 : | if is_int then | ||
228 : | Say.say | ||
229 : | ["Waiting for attached servers to become idle...\n"] | ||
230 : | else ()); | ||
231 : | app ping al; | ||
232 : | idle := al; | ||
233 : | someIdle := si | ||
234 : | end | ||
235 : | |||
236 : | blume | 484 | fun shutdown (s, method) = let |
237 : | val i = servId s | ||
238 : | fun unidle () = | ||
239 : | idle := #2 (List.partition (fn s' => servId s' = i) (!idle)) | ||
240 : | fun waitForExit () = | ||
241 : | (unidle (); | ||
242 : | ignore (wait_status (s, false)); | ||
243 : | if not (decommissioned s) then | ||
244 : | waitForExit () | ||
245 : | else ()) | ||
246 : | blume | 464 | in |
247 : | blume | 484 | method (); |
248 : | waitForExit () | ||
249 : | end | ||
250 : | blume | 464 | |
251 : | blume | 484 | fun stop s = |
252 : | shutdown (s, fn () => send (s, "shutdown\n")) | ||
253 : | blume | 464 | |
254 : | blume | 484 | fun kill s = |
255 : | shutdown (s, fn () => Unix.kill (servProc s, P.Signal.term)) | ||
256 : | blume | 464 | |
257 : | fun start { name, cmd, pathtrans, pref } = let | ||
258 : | val p = Unix.execute cmd | ||
259 : | blume | 484 | val i = newId () |
260 : | val s = S { id = i, name = name, | ||
261 : | proc = p, pt = pathtrans, pref = pref, | ||
262 : | decommissioned = ref false } | ||
263 : | blume | 464 | in |
264 : | blume | 484 | if wait_status (s, false) then (addServer s; SOME s) |
265 : | blume | 479 | else NONE |
266 : | blume | 464 | end |
267 : | blume | 645 | |
268 : | blume | 464 | fun compile p = |
269 : | blume | 486 | case grab () of |
270 : | NONE => false | ||
271 : | | SOME s => let | ||
272 : | val f = fname (p, s) | ||
273 : | in | ||
274 : | Say.vsay ["[(", servName s, "): compiling ", f, "]\n"]; | ||
275 : | send (s, concat ["compile ", f, "\n"]); | ||
276 : | wait_status (s, true) | ||
277 : | end | ||
278 : | blume | 464 | |
279 : | fun reset is_int = (Concur.reset (); wait_all is_int) | ||
280 : | |||
281 : | fun startAll st = let | ||
282 : | val l = !idle | ||
283 : | val _ = idle := [] | ||
284 : | val tl = map (fn s => Concur.fork (fn () => st s)) l | ||
285 : | in | ||
286 : | SafeIO.perform { openIt = fn () => (), | ||
287 : | closeIt = fn () => (), | ||
288 : | work = fn () => app Concur.wait tl, | ||
289 : | cleanup = reset } | ||
290 : | end | ||
291 : | |||
292 : | fun cd d = let | ||
293 : | fun st s = let | ||
294 : | val d' = fname (d, s) | ||
295 : | in | ||
296 : | send (s, concat ["cd ", d', "\n"]); | ||
297 : | ignore (wait_status (s, false)) | ||
298 : | end | ||
299 : | in | ||
300 : | startAll st | ||
301 : | end | ||
302 : | |||
303 : | fun cm { archos, project } = let | ||
304 : | fun st s = let | ||
305 : | val f = fname (project, s) | ||
306 : | in | ||
307 : | send (s, concat ["cm ", archos, " ", f, "\n"]); | ||
308 : | ignore (wait_status (s, false)) | ||
309 : | end | ||
310 : | in | ||
311 : | startAll st | ||
312 : | end | ||
313 : | |||
314 : | fun cmb { archos, root } = let | ||
315 : | fun st s = | ||
316 : | (send (s, concat ["cmb ", archos, " ", root, "\n"]); | ||
317 : | ignore (wait_status (s, false))) | ||
318 : | in | ||
319 : | startAll st | ||
320 : | end | ||
321 : | |||
322 : | blume | 632 | fun cmb_new { archos } = let |
323 : | fun st s = | ||
324 : | (send (s, concat ["cmb ", archos, "\n"]); | ||
325 : | ignore (wait_status (s, false))) | ||
326 : | in | ||
327 : | startAll st | ||
328 : | end | ||
329 : | |||
330 : | blume | 464 | fun dirbase db = let |
331 : | fun st s = | ||
332 : | (send (s, concat ["dirbase ", db, "\n"]); | ||
333 : | ignore (wait_status (s, false))) | ||
334 : | in | ||
335 : | startAll st | ||
336 : | end | ||
337 : | |||
338 : | fun enable () = enabled := true | ||
339 : | fun disable () = enabled := false | ||
340 : | |||
341 : | fun withServers f = | ||
342 : | SafeIO.perform { openIt = enable, | ||
343 : | closeIt = disable, | ||
344 : | work = f, | ||
345 : | cleanup = reset } | ||
346 : | blume | 479 | |
347 : | val name = servName | ||
348 : | blume | 464 | end |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |