Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 805 - (view) (download)

1 : blume 464 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 :     * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 :     * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 : blume 478 (* "private" is essentially the same as "unit", but we use it
21 :     * to make sure that only "pcond"-generated conditions are
22 :     * going to be explicitly signalled via "signal" *)
23 :     type private
24 :    
25 : blume 464 type 'a cond (* condition with value *)
26 :    
27 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value
28 :     * (thread initially waits with
29 :     * extremely low urgency) *)
30 :     val wait : 'a cond -> 'a (* wait with low urgency *)
31 :     val waitU : int -> 'a cond -> 'a (* wait with given urgency,
32 :     * (urgency is always higher than
33 :     * when waiting using "wait") *)
34 :    
35 :     val inputReady : TextIO.instream -> unit cond
36 :    
37 : blume 478 val pcond : unit -> private cond
38 :     val signal : private cond -> unit
39 :    
40 : blume 464 (* forget all waiting threads and input conditions *)
41 :     val reset : unit -> unit
42 : blume 632
43 :     (* check whether there are any (other) runable tasks... *)
44 :     val noTasks : unit -> bool
45 : blume 464 end
46 :    
47 :     structure Concur :> CONCUR = struct
48 :    
49 : blume 478 type private = unit
50 :    
51 : blume 464 type tstate = unit SMLofNJ.Cont.cont * int
52 :    
53 :     datatype 'a cstate =
54 :     Arrived of 'a (* value *)
55 :     | Waiting of tstate list (* waiting threads *)
56 :    
57 :     type 'a cond = 'a cstate ref
58 :    
59 :     (* simple and brain-dead priority queue *)
60 :     type task_queue = tstate list ref
61 :    
62 :     fun enqueue (x as (_, xu), qr as ref q) = let
63 :     fun insert [] = [x]
64 :     | insert ((h as (_, hu)) :: r) =
65 : blume 805 (* ">=" is important here. If we had used ">", then
66 :     * the code in btcompile.sml would not perform as
67 :     * desired. In particular, the parser thread
68 :     * would end up being scheduled first, effectively
69 :     * preventing the "cmb" message to be sent to the
70 :     * slaves. (With preemption this would not be a problem.) *)
71 :     if xu >= hu then x :: h :: r else h :: insert r
72 : blume 464 in
73 :     qr := insert q
74 :     end
75 :    
76 :     fun dequeue (ref []) = NONE
77 :     | dequeue (qr as ref (h :: t)) = (qr := t; SOME h)
78 :    
79 :     val runable : task_queue = ref []
80 :     val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
81 :    
82 :     fun reset () = (runable := []; inputs := [])
83 :    
84 : blume 632 fun noTasks () = List.null (!runable)
85 :    
86 : blume 464 (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
87 :    
88 :     fun wakeup (ref (Arrived _), _) =
89 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
90 :     | wakeup (r as ref (Waiting tsl), v) =
91 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
92 :    
93 : blume 478 fun pcond () = (ref (Waiting [])) : private cond
94 : blume 464 fun signal (ref (Arrived ())) = ()
95 :     | signal uc = wakeup (uc, ())
96 :    
97 :     fun schedule_inputs () =
98 :     case !inputs of
99 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
100 :     | il => let
101 :     val dl = map #2 il
102 :     (* since nothing else is there to do we can afford to wait *)
103 :     val pil = OS.IO.poll (dl, NONE)
104 :     fun isReady (_, pd) = let
105 :     val pd_iod = OS.IO.pollToIODesc pd
106 :     fun sameIod pi =
107 :     OS.IO.compare (pd_iod,
108 :     OS.IO.pollToIODesc
109 :     (OS.IO.infoToPollDesc pi)) = EQUAL
110 :     in
111 :     List.exists sameIod pil
112 :     end
113 :     val (ready, notready) = List.partition isReady il
114 :     in
115 :     inputs := notready;
116 :     app (fn (c, _) => wakeup (c, ())) ready;
117 :     (* try to schedule again; if this fails it's bad *)
118 :     case dequeue runable of
119 :     NONE =>
120 :     (Say.say
121 :     ["schedule_inputs failed to wake anybody up!\n"];
122 :     raise Fail "concur")
123 :     | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
124 :     end
125 :    
126 :     fun schedule () =
127 :     case dequeue runable of
128 :     NONE => schedule_inputs ()
129 :     | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
130 :    
131 :     fun wait' _ (ref (Arrived x)) = x
132 :     | wait' u (c as ref (Waiting tsl)) =
133 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting ((ts, u) :: tsl);
134 :     schedule ()));
135 :     wait' u c)
136 :    
137 :     fun wait c = wait' 0 c
138 :    
139 :     fun waitU u c = wait' (u + 1) c
140 :    
141 :     fun fork worker = let
142 :     val c = ref (Waiting [])
143 :     in
144 :     SMLofNJ.Cont.callcc (fn return =>
145 :     (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, ~1), runable);
146 :     SMLofNJ.Cont.throw return c));
147 :     wakeup (c, worker ());
148 :     schedule ()))
149 :     end
150 :    
151 :     fun inputReady iis = let
152 :     val fis = TextIO.getInstream iis
153 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
154 :     raise Fail "concur")
155 : blume 484 val rv = TextIO.StreamIO.getReader fis
156 :     val c = case rv of
157 :     (TextPrimIO.RD { ioDesc = SOME d, ... }, "") =>
158 : blume 464 (case OS.IO.pollDesc d of
159 :     NONE => bad ()
160 :     | SOME pd => let
161 :     val c = ref (Waiting [])
162 :     in
163 : blume 484 inputs := (c, OS.IO.pollIn pd) :: !inputs;
164 : blume 464 c
165 :     end)
166 : blume 484 | (_, "") => bad ()
167 :     | rv => ref (Arrived ())
168 :     in
169 :     TextIO.setInstream (iis, TextIO.StreamIO.mkInstream rv);
170 :     c
171 : blume 464 end
172 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0