Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Diff of /sml/trunk/src/cm/concur/concur.sml
ViewVC logotype

Diff of /sml/trunk/src/cm/concur/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 464, Tue Nov 9 06:49:52 1999 UTC revision 632, Sat Apr 29 15:50:42 2000 UTC
# Line 17  Line 17 
17   *)   *)
18  signature CONCUR = sig  signature CONCUR = sig
19    
20        (* "private" is essentially the same as "unit", but we use it
21         * to make sure that only "pcond"-generated conditions are
22         * going to be explicitly signalled via "signal" *)
23        type private
24    
25      type 'a cond                        (* condition with value *)      type 'a cond                        (* condition with value *)
26    
27      val fork : (unit -> 'a) -> 'a cond  (* termination condition with value      val fork : (unit -> 'a) -> 'a cond  (* termination condition with value
# Line 28  Line 33 
33                                           * when waiting using "wait") *)                                           * when waiting using "wait") *)
34    
35      val inputReady : TextIO.instream -> unit cond      val inputReady : TextIO.instream -> unit cond
36      val ucond : unit -> unit cond  
37      val signal : unit cond -> unit      val pcond : unit -> private cond
38        val signal : private cond -> unit
39    
40      (* forget all waiting threads and input conditions *)      (* forget all waiting threads and input conditions *)
41      val reset : unit -> unit      val reset : unit -> unit
42    
43        (* check whether there are any (other) runable tasks... *)
44        val noTasks : unit -> bool
45  end  end
46    
47  structure Concur :> CONCUR = struct  structure Concur :> CONCUR = struct
48    
49        type private = unit
50    
51      type tstate = unit SMLofNJ.Cont.cont * int      type tstate = unit SMLofNJ.Cont.cont * int
52    
53      datatype 'a cstate =      datatype 'a cstate =
# Line 64  Line 75 
75    
76      fun reset () = (runable := []; inputs := [])      fun reset () = (runable := []; inputs := [])
77    
78        fun noTasks () = List.null (!runable)
79    
80      (* we heavily favor non-I/O conditions, but that's ok for our purposes *)      (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
81    
82      fun wakeup (ref (Arrived _), _) =      fun wakeup (ref (Arrived _), _) =
# Line 71  Line 84 
84        | wakeup (r as ref (Waiting tsl), v) =        | wakeup (r as ref (Waiting tsl), v) =
85          (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)          (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
86    
87      fun ucond () = (ref (Waiting [])) : unit cond      fun pcond () = (ref (Waiting [])) : private cond
88      fun signal (ref (Arrived ())) = ()      fun signal (ref (Arrived ())) = ()
89        | signal uc = wakeup (uc, ())        | signal uc = wakeup (uc, ())
90    
# Line 122  Line 135 
135      fun fork worker = let      fun fork worker = let
136          val c = ref (Waiting [])          val c = ref (Waiting [])
137      in      in
         (* We give new workers a low priority so that any threads that  
          * were already running but are now waiting for some event  
          * get control first if they are re-enabled.  This is because  
          * waiting threads will clean up after errors in which case  
          * we don't want new threads to run off. *)  
138          SMLofNJ.Cont.callcc (fn return =>          SMLofNJ.Cont.callcc (fn return =>
139            (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, ~1), runable);            (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, ~1), runable);
140                                            SMLofNJ.Cont.throw return c));                                            SMLofNJ.Cont.throw return c));
# Line 136  Line 144 
144    
145      fun inputReady iis = let      fun inputReady iis = let
146          val fis = TextIO.getInstream iis          val fis = TextIO.getInstream iis
         val (r, v) = TextIO.StreamIO.getReader fis  
147          fun bad () = (Say.say ["inputReady: bad stream\n"];          fun bad () = (Say.say ["inputReady: bad stream\n"];
148                        raise Fail "concur")                        raise Fail "concur")
149      in          val rv = TextIO.StreamIO.getReader fis
150          case r of          val c = case rv of
151              TextPrimIO.RD { ioDesc = SOME d, ... } =>              (TextPrimIO.RD { ioDesc = SOME d, ... }, "") =>
152                  (case OS.IO.pollDesc d of                  (case OS.IO.pollDesc d of
153                       NONE => bad ()                       NONE => bad ()
154                     | SOME pd => let                     | SOME pd => let
                          val pd = OS.IO.pollIn pd  
                          val fis' = TextIO.StreamIO.mkInstream (r, v)  
155                           val c = ref (Waiting [])                           val c = ref (Waiting [])
156                       in                       in
157                           inputs := (c, pd) :: !inputs;                           inputs := (c, OS.IO.pollIn pd) :: !inputs;
                          TextIO.setInstream (iis, fis');  
158                           c                           c
159                       end)                       end)
160            | _ => bad ()            | (_, "") => bad ()
161              | rv => ref (Arrived ())
162        in
163            TextIO.setInstream (iis, TextIO.StreamIO.mkInstream rv);
164            c
165      end      end
166  end  end

Legend:
Removed from v.464  
changed lines
  Added in v.632

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0