Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/unix-servers.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/unix-servers.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 464 - (view) (download)

1 : blume 464 (*
2 :     * Handling compile-servers under Unix- (and Unix-like) operating systems.
3 :     *
4 :     * This is still rather crude and not very robust.
5 :     *
6 :     * (C) 1999 Lucent Technologies, Bell Laboratories
7 :     *
8 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
9 :     *)
10 :     structure Servers :> SERVERS = struct
11 :    
12 :     type pathtrans = (string -> string) option
13 :     datatype server = S of { name: string,
14 :     proc: Unix.proc,
15 :     pt: pathtrans,
16 :     pref: int }
17 :    
18 :     val enabled = ref false
19 :     val nservers = ref 0
20 :     val all = ref (StringMap.empty: server StringMap.map)
21 :    
22 :     val idle = ref ([]: server list)
23 :     val someIdle = ref (Concur.ucond ())
24 :    
25 :     fun fname (n, S { pt = NONE, ... }) = n
26 :     | fname (n, S { pt = SOME f, ... }) =
27 :     (if String.sub (n, 0) = #"/" then f n else n)
28 :     handle _ => n
29 :    
30 :     fun servName (S { name, ... }) = name
31 :    
32 :     fun send (s, msg) = let
33 :     val S { name, proc = p, ... } = s
34 :     val (_, outs) = Unix.streamsOf p
35 :     fun send0 m =
36 :     (Say.dsay ["-> ", name, " : ", m];
37 :     TextIO.output (outs, m))
38 :     in
39 :     send0 msg;
40 :     TextIO.flushOut outs
41 :     end
42 :    
43 :     fun show_idle () =
44 :     Say.dsay ("Idle:" ::
45 :     foldr (fn (s, l) => " " :: servName s :: l) ["\n"] (!idle))
46 :    
47 :     (* Mark a server idle; signal all those who are currently waiting for
48 :     * that...*)
49 :     fun mark_idle s =
50 :     (idle := s :: !idle;
51 :     Concur.signal (!someIdle);
52 :     Say.dsay ["Scheduler: slave ", servName s, " has become idle.\n"];
53 :     show_idle ())
54 :    
55 :     (* Grab an idle server; wait if necessary; reinitialize condition
56 :     * if taking the only server. *)
57 :     fun grab () =
58 :     case !idle of
59 :     [] => (Concur.wait (!someIdle); grab ())
60 :     | [only] =>
61 :     (Say.dsay ["Scheduler: taking last idle slave (",
62 :     servName only, ").\n"];
63 :     idle := [];
64 :     someIdle := Concur.ucond ();
65 :     only)
66 :     | first :: more => let
67 :     fun best (b, [], rest) = (b, rest)
68 :     | best (b, s :: r, rest) = let
69 :     val S { pref = bp, ... } = b
70 :     val S { pref = sp, ... } = s
71 :     in
72 :     if sp > bp then best (s, r, b :: rest)
73 :     else best (b, r, s :: rest)
74 :     end
75 :     val (b, rest) = best (first, more, [])
76 :     in
77 :     Say.dsay ["Scheduler: taking idle slave (",
78 :     servName b, ").\n"];
79 :     idle := rest;
80 :     show_idle ();
81 :     b
82 :     end
83 :    
84 :     fun wait_status (s, echo) = let
85 :     val S { name, proc = p, ... } = s
86 :     val (ins, _) = Unix.streamsOf p
87 :    
88 :     fun unexpected l = let
89 :     fun word (w, l) = " " :: w :: l
90 :     in
91 :     Say.say ("! Unexpected response from slave " ::
92 :     name :: ":" :: foldr word ["\n"] l)
93 :     end
94 :    
95 :     fun crashed () =
96 :     (Say.say ["! Slave ", name, " has crashed\n"];
97 :     Unix.reap p)
98 :    
99 :     val show =
100 :     if echo then (fn report => Say.say (rev report))
101 :     else (fn _ => ())
102 :    
103 :     fun wouldBlock () =
104 :     case TextIO.canInput (ins, 1) of
105 :     NONE => true
106 :     | SOME 0 => true
107 :     | SOME _ => false
108 :    
109 :     fun loop report =
110 :     if wouldBlock () then wait report
111 :     else let
112 :     val line = TextIO.inputLine ins
113 :     in
114 :     if line = "" then (crashed (); false)
115 :     else
116 :     (Say.dsay ["<- ", name, ": ", line];
117 :     case String.tokens Char.isSpace line of
118 :     ["SLAVE:", "ok"] =>
119 :     (mark_idle s; show report; true)
120 :     | ["SLAVE:", "error"] =>
121 :     (mark_idle s;
122 :     (* In the case of error we don't show
123 :     * the report because it will be re-enacted
124 :     * locally. *)
125 :     false)
126 :     | "SLAVE:" :: l => (unexpected l;
127 :     loop report)
128 :     | _ => loop (line :: report))
129 :     end
130 :    
131 :     and wait report = (Concur.wait (Concur.inputReady ins);
132 :     loop report)
133 :     in
134 :     loop []
135 :     end
136 :    
137 :     (* Send a "ping" to all servers and wait for the "pong" responses.
138 :     * This should work for all servers, busy or no. Busy servers will
139 :     * take longer to respond because they first need to finish what
140 :     * they are doing.
141 :     * We use wait_all after we receive an interrupt signal. The ping-pong
142 :     * protocol does not suffer from the race condition that we would have
143 :     * if we wanted to only wait for "ok"s from currently busy servers.
144 :     * (The race would happen when an interrupt occurs between receiving
145 :     * "ok" and marking the corresponding slave idle). *)
146 :     fun wait_all is_int = let
147 :     val al = StringMap.listItems (!all)
148 :     fun ping (s as S { name, proc = p, ... }) = let
149 :     val (ins, _) = Unix.streamsOf p
150 :     fun loop () = let
151 :     val line = TextIO.inputLine ins
152 :     in
153 :     Say.dsay ["<- ", name, ": ", line];
154 :     case String.tokens Char.isSpace line of
155 :     ["SLAVE:", "pong"] => ()
156 :     | _ => loop ()
157 :     end
158 :     in
159 :     send (s, "ping\n");
160 :     loop ()
161 :     end
162 :     val si = Concur.ucond ()
163 :     in
164 :     if List.null al then ()
165 :     else (Concur.signal si;
166 :     if is_int then
167 :     Say.say
168 :     ["Waiting for attached servers to become idle...\n"]
169 :     else ());
170 :     app ping al;
171 :     idle := al;
172 :     someIdle := si
173 :     end
174 :    
175 :     fun shutdown (name, method) = let
176 :     val (m, s) = StringMap.remove (!all, name)
177 :     val S { proc = p, ... } = s
178 :     val (_, il) = List.partition (fn s => name = servName s) (!idle)
179 :     in
180 :     method s;
181 :     ignore (Unix.reap p);
182 :     all := m;
183 :     nservers := !nservers - 1;
184 :     idle := il
185 :     end handle LibBase.NotFound => ()
186 :    
187 :     fun stop name =
188 :     shutdown (name, fn s => send (s, "shutdown\n"))
189 :    
190 :     fun kill name =
191 :     shutdown (name, fn (S { proc = p, ... }) =>
192 :     Unix.kill (p, Posix.Signal.kill))
193 :    
194 :     fun start { name, cmd, pathtrans, pref } = let
195 :     val _ = stop name
196 :     val p = Unix.execute cmd
197 :     val s = S { name = name, proc = p, pt = pathtrans, pref = pref }
198 :     in
199 :     if wait_status (s, false) then
200 :     (all := StringMap.insert (!all, name, s);
201 :     nservers := 1 + !nservers;
202 :     true)
203 :     else false
204 :     end
205 :    
206 :     fun compile p =
207 :     if not (!enabled) orelse !nservers = 0 then false
208 :     else let
209 :     val s = grab ()
210 :     val f = fname (p, s)
211 :     in
212 :     Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];
213 :     send (s, concat ["compile ", f, "\n"]);
214 :     wait_status (s, true)
215 :     end
216 :    
217 :     fun reset is_int = (Concur.reset (); wait_all is_int)
218 :    
219 :     fun startAll st = let
220 :     val l = !idle
221 :     val _ = idle := []
222 :     val tl = map (fn s => Concur.fork (fn () => st s)) l
223 :     in
224 :     SafeIO.perform { openIt = fn () => (),
225 :     closeIt = fn () => (),
226 :     work = fn () => app Concur.wait tl,
227 :     cleanup = reset }
228 :     end
229 :    
230 :     fun cd d = let
231 :     fun st s = let
232 :     val d' = fname (d, s)
233 :     in
234 :     send (s, concat ["cd ", d', "\n"]);
235 :     ignore (wait_status (s, false))
236 :     end
237 :     in
238 :     startAll st
239 :     end
240 :    
241 :     fun cm { archos, project } = let
242 :     fun st s = let
243 :     val f = fname (project, s)
244 :     in
245 :     send (s, concat ["cm ", archos, " ", f, "\n"]);
246 :     ignore (wait_status (s, false))
247 :     end
248 :     in
249 :     startAll st
250 :     end
251 :    
252 :     fun cmb { archos, root } = let
253 :     fun st s =
254 :     (send (s, concat ["cmb ", archos, " ", root, "\n"]);
255 :     ignore (wait_status (s, false)))
256 :     in
257 :     startAll st
258 :     end
259 :    
260 :     fun dirbase db = let
261 :     fun st s =
262 :     (send (s, concat ["dirbase ", db, "\n"]);
263 :     ignore (wait_status (s, false)))
264 :     in
265 :     startAll st
266 :     end
267 :    
268 :     fun enable () = enabled := true
269 :     fun disable () = enabled := false
270 :    
271 :     fun withServers f =
272 :     SafeIO.perform { openIt = enable,
273 :     closeIt = disable,
274 :     work = f,
275 :     cleanup = reset }
276 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0