Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Diff of /sml/trunk/src/cm/concur/unix-servers.sml
ViewVC logotype

Diff of /sml/trunk/src/cm/concur/unix-servers.sml

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 464, Tue Nov 9 06:49:52 1999 UTC revision 651, Thu Jun 1 18:34:03 2000 UTC
# Line 9  Line 9 
9   *)   *)
10  structure Servers :> SERVERS = struct  structure Servers :> SERVERS = struct
11    
12        structure P = Posix
13    
14      type pathtrans = (string -> string) option      type pathtrans = (string -> string) option
15      datatype server = S of { name: string,      datatype server = S of { id: int,
16                                 name: string,
17                               proc: Unix.proc,                               proc: Unix.proc,
18                               pt: pathtrans,                               pt: pathtrans,
19                               pref: int }                               pref: int,
20                                 decommissioned: bool ref }
21    
22        fun servId (S { id, ... }) = id
23        fun decommission (S { decommissioned, ... }) = decommissioned := true
24        fun decommissioned (S { decommissioned = d, ... }) = !d
25        fun servName (S { name, ... }) = name
26        fun servPref (S { pref, ... }) = pref
27        fun servPT (S { pt, ... }) = pt
28        fun servProc (S { proc, ... }) = proc
29        val servIns = #1 o Unix.streamsOf o servProc
30        val servOuts = #2 o Unix.streamsOf o servProc
31    
32        val newId = let
33            val r = ref 0
34        in
35            fn () => let val i = !r in r := i + 1; i end
36        end
37      val enabled = ref false      val enabled = ref false
     val nservers = ref 0  
     val all = ref (StringMap.empty: server StringMap.map)  
38    
39      val idle = ref ([]: server list)      val idle = ref ([]: server list)
40      val someIdle = ref (Concur.ucond ())      val someIdle = ref (Concur.pcond ())
   
     fun fname (n, S { pt = NONE, ... }) = n  
       | fname (n, S { pt = SOME f, ... }) =  
         (if String.sub (n, 0) = #"/" then f n else n)  
         handle _ => n  
41    
42      fun servName (S { name, ... }) = name      local
43            val all = ref (IntMap.empty: server IntMap.map)
44            fun nservers () = IntMap.numItems (!all)
45        in
46            fun allIdle () = length (!idle) = nservers ()
47            fun noServers () = nservers () = 0
48            fun allServers () = IntMap.listItems (!all)
49            fun addServer s = let
50                val ns = nservers ()
51            in
52                all := IntMap.insert (!all, servId s, s)
53            end
54            fun delServer s =
55                (all := #1 (IntMap.remove (!all, servId s));
56                 (* If this was the last server we need to wake up
57                  * everyone who is currently waiting to grab a server.
58                  * The "grab"-loop will then gracefully fail and
59                  * not cause a deadlock. *)
60                 if noServers () then
61                     (Say.dsay ["No more servers -> back to sequential mode.\n"];
62                      Concur.signal (!someIdle))
63                 else ())
64        end
65    
66        (* This really shouldn't be here, but putting it into SrcPath would
67         * create a dependency cycle.  Some better structuring will fix this. *)
68        fun isAbsoluteDescr d =
69            (case String.sub (d, 0) of #"/" => true | #"%" => true | _ => false)
70            handle _ => false
71    
72        fun fname (n, s) =
73            case servPT s of
74                NONE => n
75              | SOME f => if isAbsoluteDescr n then f n else n
76    
77        (* protect some code segment from sigPIPE signals... *)
78        fun pprotect work = let
79            val pipe = UnixSignals.sigPIPE
80            fun disable () = Signals.setHandler (pipe, Signals.IGNORE)
81            fun reenable sa = ignore (Signals.setHandler (pipe, sa))
82        in
83            SafeIO.perform { openIt = disable, closeIt = reenable,
84                             work = fn _ => work (), cleanup = fn _ => () }
85        end
86    
87        (* Send a message to a slave. This must be sigPIPE-protected. *)
88      fun send (s, msg) = let      fun send (s, msg) = let
89          val S { name, proc = p, ... } = s          val outs = servOuts s
         val (_, outs) = Unix.streamsOf p  
         fun send0 m =  
             (Say.dsay ["-> ", name, " : ", m];  
              TextIO.output (outs, m))  
90      in      in
91          send0 msg;          Say.dsay ["-> ", servName s, " : ", msg];
92          TextIO.flushOut outs          pprotect (fn () =>
93                      (TextIO.output (outs, msg); TextIO.flushOut outs)
94                      handle _ => ())
95      end      end
96    
97      fun show_idle () =      fun show_idle () =
# Line 55  Line 109 
109      (* Grab an idle server; wait if necessary; reinitialize condition      (* Grab an idle server; wait if necessary; reinitialize condition
110       * if taking the only server. *)       * if taking the only server. *)
111      fun grab () =      fun grab () =
112          case !idle of          (* We need to check the following every time (at least the
113             * "noServers" part) because it might be that all servers
114             * have meanwhile gone away for some reason (crashed, etc.). *)
115            if not (!enabled) orelse noServers () then NONE
116            else case !idle of
117              [] => (Concur.wait (!someIdle); grab ())              [] => (Concur.wait (!someIdle); grab ())
118            | [only] =>            | [only] =>
119                  (Say.dsay ["Scheduler: taking last idle slave (",                  (Say.dsay ["Scheduler: taking last idle slave (",
120                             servName only, ").\n"];                             servName only, ").\n"];
121                   idle := [];                   idle := [];
122                   someIdle := Concur.ucond ();                   someIdle := Concur.pcond ();
123                   only)                   SOME only)
124            | first :: more => let            | first :: more => let
125                  fun best (b, [], rest) = (b, rest)                  fun best (b, [], rest) = (b, rest)
126                    | best (b, s :: r, rest) = let                    | best (b, s :: r, rest) = let
127                          val S { pref = bp, ... } = b                          val bp = servPref b
128                          val S { pref = sp, ... } = s                          val sp = servPref s
129                      in                      in
130                          if sp > bp then best (s, r, b :: rest)                          if sp > bp then best (s, r, b :: rest)
131                          else best (b, r, s :: rest)                          else best (b, r, s :: rest)
# Line 78  Line 136 
136                            servName b, ").\n"];                            servName b, ").\n"];
137                  idle := rest;                  idle := rest;
138                  show_idle ();                  show_idle ();
139                  b                  SOME b
140              end              end
141    
142      fun wait_status (s, echo) = let      fun wait_status (s, echo) = let
143          val S { name, proc = p, ... } = s          val name = servName s
144          val (ins, _) = Unix.streamsOf p          val ins = servIns s
145    
146          fun unexpected l = let          fun unexpected l = let
147              fun word (w, l) = " " :: w :: l              fun word (w, l) = " " :: w :: l
# Line 92  Line 150 
150                       name :: ":" :: foldr word ["\n"] l)                       name :: ":" :: foldr word ["\n"] l)
151          end          end
152    
153          fun crashed () =          fun serverExit () = let
154              (Say.say ["! Slave ", name, " has crashed\n"];              val what =
155               Unix.reap p)                  case pprotect (fn () => Unix.reap (servProc s)) of
156                        (P.Process.W_EXITED | P.Process.W_EXITSTATUS 0w0) =>
157                            "shut down"
158                      | _ => "crashed"
159            in
160                decommission s;
161                Say.say ["[!Slave ", name, " has ", what, ".]\n"];
162                delServer s
163            end
164    
165          val show =          val show =
166              if echo then (fn report => Say.say (rev report))              if echo then (fn report => Say.say (rev report))
167              else (fn _ => ())              else (fn _ => ())
168    
         fun wouldBlock () =  
             case TextIO.canInput (ins, 1) of  
                 NONE => true  
               | SOME 0 => true  
               | SOME _ => false  
   
169          fun loop report =          fun loop report =
170              if wouldBlock () then wait report              if decommissioned s then false
             else let  
                 val line = TextIO.inputLine ins  
             in  
                 if line = "" then (crashed (); false)  
171                  else                  else
172                    (Concur.wait (Concur.inputReady ins);
173                     case TextIO.inputLine ins of
174                         "" => (serverExit (); false)
175                       | line =>
176                      (Say.dsay ["<- ", name, ": ", line];                      (Say.dsay ["<- ", name, ": ", line];
177                       case String.tokens Char.isSpace line of                       case String.tokens Char.isSpace line of
178                           ["SLAVE:", "ok"] =>                           ["SLAVE:", "ok"] =>
# Line 123  Line 183 
183                                 * the report because it will be re-enacted                                 * the report because it will be re-enacted
184                                 * locally. *)                                 * locally. *)
185                                false)                                false)
186                         | "SLAVE:" :: l => (unexpected l;                              | "SLAVE:" :: l => (unexpected l; loop report)
187                                             loop report)                              | _ => loop (line :: report)))
                        | _ => loop (line :: report))  
             end  
   
         and wait report = (Concur.wait (Concur.inputReady ins);  
                            loop report)  
188      in      in
189          loop []          loop []
190      end      end
# Line 144  Line 199 
199       * (The race would happen when an interrupt occurs between receiving       * (The race would happen when an interrupt occurs between receiving
200       * "ok" and marking the corresponding slave idle). *)       * "ok" and marking the corresponding slave idle). *)
201      fun wait_all is_int = let      fun wait_all is_int = let
202          val al = StringMap.listItems (!all)          val al = allServers ()
203          fun ping (s as S { name, proc = p, ... }) = let          fun ping s = let
204              val (ins, _) = Unix.streamsOf p              val name = servName s
205              fun loop () = let              val ins = servIns s
206                  val line = TextIO.inputLine ins              fun loop () =
207              in                  if decommissioned s then ()
208                  Say.dsay ["<- ", name, ": ", line];                  else
209                        (Concur.wait (Concur.inputReady ins);
210                         case TextIO.inputLine ins of
211                             "" =>
212                                 (* server has gone away -> no pong *)
213                                 Say.dsay ["<-(EOF) ", name, "\n"]
214                           | line =>
215                                 (Say.dsay ["<- ", name, ": ", line];
216                  case String.tokens Char.isSpace line of                  case String.tokens Char.isSpace line of
217                      ["SLAVE:", "pong"] => ()                      ["SLAVE:", "pong"] => ()
218                    | _ => loop ()                                  | _ => loop ()))
             end  
219          in          in
220              send (s, "ping\n");              send (s, "ping\n");
221              loop ()              loop ()
222          end          end
223          val si = Concur.ucond ()          val si = Concur.pcond ()
224      in      in
225          if List.null al then ()          if List.null al then ()
226          else (Concur.signal si;          else (Concur.signal si;
# Line 172  Line 233 
233          someIdle := si          someIdle := si
234      end      end
235    
236      fun shutdown (name, method) = let      fun shutdown (s, method) = let
237          val (m, s) = StringMap.remove (!all, name)          val i = servId s
238          val S { proc = p, ... } = s          fun unidle () =
239          val (_, il) = List.partition (fn s => name = servName s) (!idle)              idle := #2 (List.partition (fn s' => servId s' = i) (!idle))
240      in          fun waitForExit () =
241          method s;              (unidle ();
242          ignore (Unix.reap p);               ignore (wait_status (s, false));
243          all := m;               if not (decommissioned s) then
244          nservers := !nservers - 1;                   waitForExit ()
245          idle := il               else ())
246      end handle LibBase.NotFound => ()      in
247            method ();
248      fun stop name =          waitForExit ()
249          shutdown (name, fn s => send (s, "shutdown\n"))      end
250    
251      fun kill name =      fun stop s =
252          shutdown (name, fn (S { proc = p, ... }) =>          shutdown (s, fn () => send (s, "shutdown\n"))
253                             Unix.kill (p, Posix.Signal.kill))  
254        fun kill s =
255            shutdown (s, fn () => Unix.kill (servProc s, P.Signal.term))
256    
257      fun start { name, cmd, pathtrans, pref } = let      fun start { name, cmd, pathtrans, pref } = let
         val _ = stop name  
258          val p = Unix.execute cmd          val p = Unix.execute cmd
259          val s = S { name = name, proc = p, pt = pathtrans, pref = pref }          val i = newId ()
260            val s = S { id = i, name = name,
261                        proc = p, pt = pathtrans, pref = pref,
262                        decommissioned = ref false }
263      in      in
264          if wait_status (s, false) then          if wait_status (s, false) then (addServer s; SOME s)
265              (all := StringMap.insert (!all, name, s);          else NONE
              nservers := 1 + !nservers;  
              true)  
         else false  
266      end      end
267    
268      fun compile p =      fun compile p =
269          if not (!enabled) orelse !nservers = 0 then false          case grab () of
270          else let              NONE => false
271              val s = grab ()            | SOME s => let
272              val f = fname (p, s)              val f = fname (p, s)
273          in          in
274              Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];              Say.vsay ["[(", servName s, "): compiling ", f, "]\n"];
# Line 257  Line 319 
319          startAll st          startAll st
320      end      end
321    
322        fun cmb_new { archos } = let
323            fun st s =
324                (send (s, concat ["cmb ", archos, "\n"]);
325                 ignore (wait_status (s, false)))
326        in
327            startAll st
328        end
329    
330      fun dirbase db = let      fun dirbase db = let
331          fun st s =          fun st s =
332              (send (s, concat ["dirbase ", db, "\n"]);              (send (s, concat ["dirbase ", db, "\n"]);
# Line 273  Line 343 
343                           closeIt = disable,                           closeIt = disable,
344                           work = f,                           work = f,
345                           cleanup = reset }                           cleanup = reset }
346    
347        val name = servName
348  end  end

Legend:
Removed from v.464  
changed lines
  Added in v.651

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0