9 |
*) |
*) |
10 |
structure Servers :> SERVERS = struct |
structure Servers :> SERVERS = struct |
11 |
|
|
12 |
|
structure P = Posix |
13 |
|
|
14 |
type pathtrans = (string -> string) option |
type pathtrans = (string -> string) option |
15 |
datatype server = S of { name: string, |
datatype server = S of { id: int, |
16 |
|
name: string, |
17 |
proc: Unix.proc, |
proc: Unix.proc, |
18 |
pt: pathtrans, |
pt: pathtrans, |
19 |
pref: int } |
pref: int, |
20 |
|
decommissioned: bool ref } |
21 |
|
|
22 |
|
fun servId (S { id, ... }) = id |
23 |
|
fun decommission (S { decommissioned, ... }) = decommissioned := true |
24 |
|
fun decommissioned (S { decommissioned = d, ... }) = !d |
25 |
|
fun servName (S { name, ... }) = name |
26 |
|
fun servPref (S { pref, ... }) = pref |
27 |
|
fun servPT (S { pt, ... }) = pt |
28 |
|
fun servProc (S { proc, ... }) = proc |
29 |
|
val servIns = #1 o Unix.streamsOf o servProc |
30 |
|
val servOuts = #2 o Unix.streamsOf o servProc |
31 |
|
|
32 |
|
val newId = let |
33 |
|
val r = ref 0 |
34 |
|
in |
35 |
|
fn () => let val i = !r in r := i + 1; i end |
36 |
|
end |
37 |
val enabled = ref false |
val enabled = ref false |
38 |
|
|
39 |
|
local |
40 |
val nservers = ref 0 |
val nservers = ref 0 |
41 |
val all = ref (StringMap.empty: server StringMap.map) |
val all = ref (IntRedBlackMap.empty: server IntRedBlackMap.map) |
42 |
|
in |
43 |
|
fun noServers () = !nservers = 0 |
44 |
|
fun allServers () = IntRedBlackMap.listItems (!all) |
45 |
|
fun addServer s = let |
46 |
|
val ns = !nservers |
47 |
|
in |
48 |
|
nservers := ns + 1; |
49 |
|
all := IntRedBlackMap.insert (!all, servId s, s) |
50 |
|
end |
51 |
|
fun delServer s = let |
52 |
|
val ns = !nservers - 1 |
53 |
|
in |
54 |
|
all := #1 (IntRedBlackMap.remove (!all, servId s)); |
55 |
|
nservers := ns |
56 |
|
end |
57 |
|
end |
58 |
|
|
59 |
val idle = ref ([]: server list) |
val idle = ref ([]: server list) |
60 |
val someIdle = ref (Concur.pcond ()) |
val someIdle = ref (Concur.pcond ()) |
65 |
(case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false) |
(case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false) |
66 |
handle _ => false |
handle _ => false |
67 |
|
|
|
fun servName (S { name, ... }) = name |
|
|
fun servPref (S { pref, ... }) = pref |
|
|
fun servPT (S { pt, ... }) = pt |
|
|
fun servProc (S { proc, ... }) = proc |
|
|
val servIns = #1 o Unix.streamsOf o servProc |
|
|
val servOuts = #2 o Unix.streamsOf o servProc |
|
|
|
|
68 |
fun fname (n, s) = |
fun fname (n, s) = |
69 |
case servPT s of |
case servPT s of |
70 |
NONE => n |
NONE => n |
71 |
| SOME f => if isAbsoluteDescr n then f n else n |
| SOME f => if isAbsoluteDescr n then f n else n |
72 |
|
|
73 |
|
fun pprotect work = let |
74 |
|
val pipe = UnixSignals.sigPIPE |
75 |
|
fun disable () = Signals.setHandler (pipe, Signals.IGNORE) |
76 |
|
fun reenable sa = ignore (Signals.setHandler (pipe, sa)) |
77 |
|
in |
78 |
|
SafeIO.perform { openIt = disable, closeIt = reenable, |
79 |
|
work = fn _ => work (), cleanup = fn _ => () } |
80 |
|
end |
81 |
|
|
82 |
fun send (s, msg) = let |
fun send (s, msg) = let |
83 |
val outs = servOuts s |
val outs = servOuts s |
84 |
in |
in |
85 |
Say.dsay ["-> ", servName s, " : ", msg]; |
Say.dsay ["-> ", servName s, " : ", msg]; |
86 |
TextIO.output (outs, msg); |
pprotect (fn () => |
87 |
TextIO.flushOut outs |
(TextIO.output (outs, msg); TextIO.flushOut outs) |
88 |
|
handle _ => ()) |
89 |
end |
end |
90 |
|
|
91 |
fun show_idle () = |
fun show_idle () = |
140 |
name :: ":" :: foldr word ["\n"] l) |
name :: ":" :: foldr word ["\n"] l) |
141 |
end |
end |
142 |
|
|
143 |
fun crashed () = |
fun serverExit () = let |
144 |
(Say.say ["! Slave ", name, " has crashed\n"]; |
val what = |
145 |
Unix.reap (servProc s)) |
case pprotect (fn () => Unix.reap (servProc s)) of |
146 |
|
(P.Process.W_EXITED | P.Process.W_EXITSTATUS 0w0) => |
147 |
|
"shut down" |
148 |
|
| _ => "crashed" |
149 |
|
in |
150 |
|
decommission s; |
151 |
|
Say.say ["! Slave ", name, " has ", what, ".\n"]; |
152 |
|
delServer s |
153 |
|
end |
154 |
|
|
155 |
val show = |
val show = |
156 |
if echo then (fn report => Say.say (rev report)) |
if echo then (fn report => Say.say (rev report)) |
157 |
else (fn _ => ()) |
else (fn _ => ()) |
158 |
|
|
|
fun wouldBlock () = |
|
|
case TextIO.canInput (ins, 1) of |
|
|
NONE => true |
|
|
| SOME 0 => true |
|
|
| SOME _ => false |
|
|
|
|
159 |
fun loop report = |
fun loop report = |
160 |
if wouldBlock () then wait report |
if decommissioned s then false |
|
else let |
|
|
val line = TextIO.inputLine ins |
|
|
in |
|
|
if line = "" then (crashed (); false) |
|
161 |
else |
else |
162 |
|
(Concur.wait (Concur.inputReady ins); |
163 |
|
case TextIO.inputLine ins of |
164 |
|
"" => (serverExit (); false) |
165 |
|
| line => |
166 |
(Say.dsay ["<- ", name, ": ", line]; |
(Say.dsay ["<- ", name, ": ", line]; |
167 |
case String.tokens Char.isSpace line of |
case String.tokens Char.isSpace line of |
168 |
["SLAVE:", "ok"] => |
["SLAVE:", "ok"] => |
173 |
* the report because it will be re-enacted |
* the report because it will be re-enacted |
174 |
* locally. *) |
* locally. *) |
175 |
false) |
false) |
176 |
| "SLAVE:" :: l => (unexpected l; |
| "SLAVE:" :: l => (unexpected l; loop report) |
177 |
loop report) |
| _ => loop (line :: report))) |
|
| _ => loop (line :: report)) |
|
|
end |
|
|
|
|
|
and wait report = (Concur.wait (Concur.inputReady ins); |
|
|
loop report) |
|
178 |
in |
in |
179 |
loop [] |
loop [] |
180 |
end |
end |
189 |
* (The race would happen when an interrupt occurs between receiving |
* (The race would happen when an interrupt occurs between receiving |
190 |
* "ok" and marking the corresponding slave idle). *) |
* "ok" and marking the corresponding slave idle). *) |
191 |
fun wait_all is_int = let |
fun wait_all is_int = let |
192 |
val al = StringMap.listItems (!all) |
val al = allServers () |
193 |
fun ping s = let |
fun ping s = let |
194 |
val name = servName s |
val name = servName s |
195 |
val ins = servIns s |
val ins = servIns s |
196 |
fun loop () = let |
fun loop () = |
197 |
val line = TextIO.inputLine ins |
if decommissioned s then () |
198 |
in |
else |
199 |
Say.dsay ["<- ", name, ": ", line]; |
(Concur.wait (Concur.inputReady ins); |
200 |
|
case TextIO.inputLine ins of |
201 |
|
"" => |
202 |
|
(* server has gone away -> no pong *) |
203 |
|
Say.dsay ["<-(EOF) ", name, "\n"] |
204 |
|
| line => |
205 |
|
(Say.dsay ["<- ", name, ": ", line]; |
206 |
case String.tokens Char.isSpace line of |
case String.tokens Char.isSpace line of |
207 |
["SLAVE:", "pong"] => () |
["SLAVE:", "pong"] => () |
208 |
| _ => loop () |
| _ => loop ())) |
|
end |
|
209 |
in |
in |
210 |
send (s, "ping\n"); |
send (s, "ping\n"); |
211 |
loop () |
loop () |
223 |
someIdle := si |
someIdle := si |
224 |
end |
end |
225 |
|
|
226 |
fun shutdown (name, method) = let |
fun shutdown (s, method) = let |
227 |
val (m, s) = StringMap.remove (!all, name) |
val i = servId s |
228 |
val p = servProc s |
fun unidle () = |
229 |
val (_, il) = List.partition (fn s => name = servName s) (!idle) |
idle := #2 (List.partition (fn s' => servId s' = i) (!idle)) |
230 |
in |
fun waitForExit () = |
231 |
method s; |
(unidle (); |
232 |
ignore (Unix.reap p); |
ignore (wait_status (s, false)); |
233 |
all := m; |
if not (decommissioned s) then |
234 |
nservers := !nservers - 1; |
waitForExit () |
235 |
idle := il |
else ()) |
236 |
end handle LibBase.NotFound => () |
in |
237 |
|
method (); |
238 |
fun stop_by_name name = shutdown (name, fn s => send (s, "shutdown\n")) |
waitForExit () |
239 |
|
end |
240 |
|
|
241 |
fun stop s = stop_by_name (servName s) |
fun stop s = |
242 |
|
shutdown (s, fn () => send (s, "shutdown\n")) |
243 |
|
|
244 |
fun kill s = shutdown (servName s, |
fun kill s = |
245 |
fn s => Unix.kill (servProc s, Posix.Signal.term)) |
shutdown (s, fn () => Unix.kill (servProc s, P.Signal.term)) |
246 |
|
|
247 |
fun start { name, cmd, pathtrans, pref } = let |
fun start { name, cmd, pathtrans, pref } = let |
|
val _ = stop_by_name name |
|
248 |
val p = Unix.execute cmd |
val p = Unix.execute cmd |
249 |
val s = S { name = name, proc = p, pt = pathtrans, pref = pref } |
val i = newId () |
250 |
|
val s = S { id = i, name = name, |
251 |
|
proc = p, pt = pathtrans, pref = pref, |
252 |
|
decommissioned = ref false } |
253 |
in |
in |
254 |
if wait_status (s, false) then |
if wait_status (s, false) then (addServer s; SOME s) |
|
(all := StringMap.insert (!all, name, s); |
|
|
nservers := 1 + !nservers; |
|
|
SOME s) |
|
255 |
else NONE |
else NONE |
256 |
end |
end |
257 |
|
|
258 |
fun compile p = |
fun compile p = |
259 |
if not (!enabled) orelse !nservers = 0 then false |
if not (!enabled) orelse noServers () then false |
260 |
else let |
else let |
261 |
val s = grab () |
val s = grab () |
262 |
val f = fname (p, s) |
val f = fname (p, s) |