Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/unix-servers.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/unix-servers.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 479 - (view) (download)

1 : blume 464 (*
2 :     * Handling compile-servers under Unix- (and Unix-like) operating systems.
3 :     *
4 :     * This is still rather crude and not very robust.
5 :     *
6 :     * (C) 1999 Lucent Technologies, Bell Laboratories
7 :     *
8 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
9 :     *)
10 :     structure Servers :> SERVERS = struct
11 :    
12 :     type pathtrans = (string -> string) option
13 :     datatype server = S of { name: string,
14 :     proc: Unix.proc,
15 :     pt: pathtrans,
16 :     pref: int }
17 :    
18 :     val enabled = ref false
19 :     val nservers = ref 0
20 :     val all = ref (StringMap.empty: server StringMap.map)
21 :    
22 :     val idle = ref ([]: server list)
23 : blume 478 val someIdle = ref (Concur.pcond ())
24 : blume 464
25 : blume 479 (* This really shouldn't be here, but putting it into SrcPath would
26 :     * create a dependency cycle. Some better structuring will fix this. *)
27 :     fun isAbsoluteDescr d =
28 :     (case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false)
29 :     handle _ => false
30 : blume 464
31 :     fun servName (S { name, ... }) = name
32 : blume 479 fun servPref (S { pref, ... }) = pref
33 :     fun servPT (S { pt, ... }) = pt
34 :     fun servProc (S { proc, ... }) = proc
35 :     val servIns = #1 o Unix.streamsOf o servProc
36 :     val servOuts = #2 o Unix.streamsOf o servProc
37 : blume 464
38 : blume 479 fun fname (n, s) =
39 :     case servPT s of
40 :     NONE => n
41 :     | SOME f => if isAbsoluteDescr n then f n else n
42 :    
43 : blume 464 fun send (s, msg) = let
44 : blume 479 val outs = servOuts s
45 : blume 464 in
46 : blume 479 Say.dsay ["-> ", servName s, " : ", msg];
47 :     TextIO.output (outs, msg);
48 : blume 464 TextIO.flushOut outs
49 :     end
50 :    
51 :     fun show_idle () =
52 :     Say.dsay ("Idle:" ::
53 :     foldr (fn (s, l) => " " :: servName s :: l) ["\n"] (!idle))
54 :    
55 :     (* Mark a server idle; signal all those who are currently waiting for
56 :     * that...*)
57 :     fun mark_idle s =
58 :     (idle := s :: !idle;
59 :     Concur.signal (!someIdle);
60 :     Say.dsay ["Scheduler: slave ", servName s, " has become idle.\n"];
61 :     show_idle ())
62 :    
63 :     (* Grab an idle server; wait if necessary; reinitialize condition
64 :     * if taking the only server. *)
65 :     fun grab () =
66 :     case !idle of
67 :     [] => (Concur.wait (!someIdle); grab ())
68 :     | [only] =>
69 :     (Say.dsay ["Scheduler: taking last idle slave (",
70 :     servName only, ").\n"];
71 :     idle := [];
72 : blume 478 someIdle := Concur.pcond ();
73 : blume 464 only)
74 :     | first :: more => let
75 :     fun best (b, [], rest) = (b, rest)
76 :     | best (b, s :: r, rest) = let
77 : blume 479 val bp = servPref b
78 :     val sp = servPref s
79 : blume 464 in
80 :     if sp > bp then best (s, r, b :: rest)
81 :     else best (b, r, s :: rest)
82 :     end
83 :     val (b, rest) = best (first, more, [])
84 :     in
85 :     Say.dsay ["Scheduler: taking idle slave (",
86 :     servName b, ").\n"];
87 :     idle := rest;
88 :     show_idle ();
89 :     b
90 :     end
91 :    
92 :     fun wait_status (s, echo) = let
93 : blume 479 val name = servName s
94 :     val ins = servIns s
95 : blume 464
96 :     fun unexpected l = let
97 :     fun word (w, l) = " " :: w :: l
98 :     in
99 :     Say.say ("! Unexpected response from slave " ::
100 :     name :: ":" :: foldr word ["\n"] l)
101 :     end
102 :    
103 :     fun crashed () =
104 :     (Say.say ["! Slave ", name, " has crashed\n"];
105 : blume 479 Unix.reap (servProc s))
106 : blume 464
107 :     val show =
108 :     if echo then (fn report => Say.say (rev report))
109 :     else (fn _ => ())
110 :    
111 :     fun wouldBlock () =
112 :     case TextIO.canInput (ins, 1) of
113 :     NONE => true
114 :     | SOME 0 => true
115 :     | SOME _ => false
116 :    
117 :     fun loop report =
118 :     if wouldBlock () then wait report
119 :     else let
120 :     val line = TextIO.inputLine ins
121 :     in
122 :     if line = "" then (crashed (); false)
123 :     else
124 :     (Say.dsay ["<- ", name, ": ", line];
125 :     case String.tokens Char.isSpace line of
126 :     ["SLAVE:", "ok"] =>
127 :     (mark_idle s; show report; true)
128 :     | ["SLAVE:", "error"] =>
129 :     (mark_idle s;
130 :     (* In the case of error we don't show
131 :     * the report because it will be re-enacted
132 :     * locally. *)
133 :     false)
134 :     | "SLAVE:" :: l => (unexpected l;
135 :     loop report)
136 :     | _ => loop (line :: report))
137 :     end
138 :    
139 :     and wait report = (Concur.wait (Concur.inputReady ins);
140 :     loop report)
141 :     in
142 :     loop []
143 :     end
144 :    
145 :     (* Send a "ping" to all servers and wait for the "pong" responses.
146 :     * This should work for all servers, busy or no. Busy servers will
147 :     * take longer to respond because they first need to finish what
148 :     * they are doing.
149 :     * We use wait_all after we receive an interrupt signal. The ping-pong
150 :     * protocol does not suffer from the race condition that we would have
151 :     * if we wanted to only wait for "ok"s from currently busy servers.
152 :     * (The race would happen when an interrupt occurs between receiving
153 :     * "ok" and marking the corresponding slave idle). *)
154 :     fun wait_all is_int = let
155 :     val al = StringMap.listItems (!all)
156 : blume 479 fun ping s = let
157 :     val name = servName s
158 :     val ins = servIns s
159 : blume 464 fun loop () = let
160 :     val line = TextIO.inputLine ins
161 :     in
162 :     Say.dsay ["<- ", name, ": ", line];
163 :     case String.tokens Char.isSpace line of
164 :     ["SLAVE:", "pong"] => ()
165 :     | _ => loop ()
166 :     end
167 :     in
168 :     send (s, "ping\n");
169 :     loop ()
170 :     end
171 : blume 478 val si = Concur.pcond ()
172 : blume 464 in
173 :     if List.null al then ()
174 :     else (Concur.signal si;
175 :     if is_int then
176 :     Say.say
177 :     ["Waiting for attached servers to become idle...\n"]
178 :     else ());
179 :     app ping al;
180 :     idle := al;
181 :     someIdle := si
182 :     end
183 :    
184 :     fun shutdown (name, method) = let
185 :     val (m, s) = StringMap.remove (!all, name)
186 : blume 479 val p = servProc s
187 : blume 464 val (_, il) = List.partition (fn s => name = servName s) (!idle)
188 :     in
189 :     method s;
190 :     ignore (Unix.reap p);
191 :     all := m;
192 :     nservers := !nservers - 1;
193 :     idle := il
194 :     end handle LibBase.NotFound => ()
195 :    
196 : blume 479 fun stop_by_name name = shutdown (name, fn s => send (s, "shutdown\n"))
197 : blume 464
198 : blume 479 fun stop s = stop_by_name (servName s)
199 : blume 464
200 : blume 479 fun kill s = shutdown (servName s,
201 :     fn s => Unix.kill (servProc s, Posix.Signal.term))
202 :    
203 : blume 464 fun start { name, cmd, pathtrans, pref } = let
204 : blume 479 val _ = stop_by_name name
205 : blume 464 val p = Unix.execute cmd
206 :     val s = S { name = name, proc = p, pt = pathtrans, pref = pref }
207 :     in
208 :     if wait_status (s, false) then
209 :     (all := StringMap.insert (!all, name, s);
210 :     nservers := 1 + !nservers;
211 : blume 479 SOME s)
212 :     else NONE
213 : blume 464 end
214 :    
215 :     fun compile p =
216 :     if not (!enabled) orelse !nservers = 0 then false
217 :     else let
218 :     val s = grab ()
219 :     val f = fname (p, s)
220 :     in
221 :     Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];
222 :     send (s, concat ["compile ", f, "\n"]);
223 :     wait_status (s, true)
224 :     end
225 :    
226 :     fun reset is_int = (Concur.reset (); wait_all is_int)
227 :    
228 :     fun startAll st = let
229 :     val l = !idle
230 :     val _ = idle := []
231 :     val tl = map (fn s => Concur.fork (fn () => st s)) l
232 :     in
233 :     SafeIO.perform { openIt = fn () => (),
234 :     closeIt = fn () => (),
235 :     work = fn () => app Concur.wait tl,
236 :     cleanup = reset }
237 :     end
238 :    
239 :     fun cd d = let
240 :     fun st s = let
241 :     val d' = fname (d, s)
242 :     in
243 :     send (s, concat ["cd ", d', "\n"]);
244 :     ignore (wait_status (s, false))
245 :     end
246 :     in
247 :     startAll st
248 :     end
249 :    
250 :     fun cm { archos, project } = let
251 :     fun st s = let
252 :     val f = fname (project, s)
253 :     in
254 :     send (s, concat ["cm ", archos, " ", f, "\n"]);
255 :     ignore (wait_status (s, false))
256 :     end
257 :     in
258 :     startAll st
259 :     end
260 :    
261 :     fun cmb { archos, root } = let
262 :     fun st s =
263 :     (send (s, concat ["cmb ", archos, " ", root, "\n"]);
264 :     ignore (wait_status (s, false)))
265 :     in
266 :     startAll st
267 :     end
268 :    
269 :     fun dirbase db = let
270 :     fun st s =
271 :     (send (s, concat ["dirbase ", db, "\n"]);
272 :     ignore (wait_status (s, false)))
273 :     in
274 :     startAll st
275 :     end
276 :    
277 :     fun enable () = enabled := true
278 :     fun disable () = enabled := false
279 :    
280 :     fun withServers f =
281 :     SafeIO.perform { openIt = enable,
282 :     closeIt = disable,
283 :     work = f,
284 :     cleanup = reset }
285 : blume 479
286 :     val name = servName
287 : blume 464 end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0