Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/compile/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/compile/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 462 - (view) (download)

1 : blume 448 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 : blume 449 * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 : blume 448 * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 :     type 'a cond (* condition with value *)
21 :    
22 : blume 462 val fork : (unit -> 'a) -> 'a cond (* termination condition with value
23 :     * (thread initially waits with
24 :     * extremely low urgency) *)
25 :     val wait : 'a cond -> 'a (* wait with low urgency *)
26 :     val waitU : int -> 'a cond -> 'a (* wait with given urgency,
27 :     * (urgency is always higher than
28 :     * when waiting using "wait") *)
29 :    
30 : blume 448 val inputReady : TextIO.instream -> unit cond
31 :     val ucond : unit -> unit cond
32 :     val signal : unit cond -> unit
33 : blume 451
34 :     (* forget all waiting threads and input conditions *)
35 :     val reset : unit -> unit
36 : blume 448 end
37 :    
38 :     structure Concur :> CONCUR = struct
39 :    
40 : blume 454 type tstate = unit SMLofNJ.Cont.cont * int
41 : blume 448
42 :     datatype 'a cstate =
43 :     Arrived of 'a (* value *)
44 :     | Waiting of tstate list (* waiting threads *)
45 :    
46 :     type 'a cond = 'a cstate ref
47 :    
48 : blume 454 (* simple and brain-dead priority queue *)
49 :     type task_queue = tstate list ref
50 : blume 448
51 : blume 454 fun enqueue (x as (_, xu), qr as ref q) = let
52 :     fun insert [] = [x]
53 :     | insert ((h as (_, hu)) :: r) =
54 :     if xu > hu then x :: h :: r else h :: insert r
55 :     in
56 :     qr := insert q
57 :     end
58 : blume 448
59 : blume 454 fun dequeue (ref []) = NONE
60 :     | dequeue (qr as ref (h :: t)) = (qr := t; SOME h)
61 : blume 448
62 : blume 454 val runable : task_queue = ref []
63 : blume 448 val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
64 :    
65 : blume 454 fun reset () = (runable := []; inputs := [])
66 : blume 451
67 : blume 448 (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
68 :    
69 :     fun wakeup (ref (Arrived _), _) =
70 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
71 :     | wakeup (r as ref (Waiting tsl), v) =
72 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
73 :    
74 :     fun ucond () = (ref (Waiting [])) : unit cond
75 :     fun signal (ref (Arrived ())) = ()
76 :     | signal uc = wakeup (uc, ())
77 :    
78 :     fun schedule_inputs () =
79 :     case !inputs of
80 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
81 :     | il => let
82 :     val dl = map #2 il
83 :     (* since nothing else is there to do we can afford to wait *)
84 :     val pil = OS.IO.poll (dl, NONE)
85 : blume 449 fun isReady (_, pd) = let
86 :     val pd_iod = OS.IO.pollToIODesc pd
87 :     fun sameIod pi =
88 :     OS.IO.compare (pd_iod,
89 :     OS.IO.pollToIODesc
90 :     (OS.IO.infoToPollDesc pi)) = EQUAL
91 :     in
92 :     List.exists sameIod pil
93 :     end
94 :     val (ready, notready) = List.partition isReady il
95 : blume 448 in
96 : blume 449 inputs := notready;
97 :     app (fn (c, _) => wakeup (c, ())) ready;
98 : blume 448 (* try to schedule again; if this fails it's bad *)
99 :     case dequeue runable of
100 :     NONE =>
101 :     (Say.say
102 :     ["schedule_inputs failed to wake anybody up!\n"];
103 :     raise Fail "concur")
104 : blume 454 | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
105 : blume 448 end
106 :    
107 :     fun schedule () =
108 :     case dequeue runable of
109 :     NONE => schedule_inputs ()
110 : blume 454 | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
111 : blume 448
112 : blume 454 fun wait' _ (ref (Arrived x)) = x
113 :     | wait' u (c as ref (Waiting tsl)) =
114 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting ((ts, u) :: tsl);
115 : blume 448 schedule ()));
116 : blume 454 wait' u c)
117 : blume 448
118 : blume 454 fun wait c = wait' 0 c
119 :    
120 : blume 462 fun waitU u c = wait' (u + 1) c
121 :    
122 : blume 448 fun fork worker = let
123 :     val c = ref (Waiting [])
124 :     in
125 : blume 462 (* We give new workers a low priority so that any threads that
126 :     * were already running but are now waiting for some event
127 :     * get control first if they are re-enabled. This is because
128 :     * waiting threads will clean up after errors in which case
129 :     * we don't want new threads to run off. *)
130 : blume 448 SMLofNJ.Cont.callcc (fn return =>
131 : blume 462 (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, ~1), runable);
132 : blume 448 SMLofNJ.Cont.throw return c));
133 :     wakeup (c, worker ());
134 :     schedule ()))
135 :     end
136 :    
137 :     fun inputReady iis = let
138 :     val fis = TextIO.getInstream iis
139 :     val (r, v) = TextIO.StreamIO.getReader fis
140 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
141 :     raise Fail "concur")
142 :     in
143 :     case r of
144 :     TextPrimIO.RD { ioDesc = SOME d, ... } =>
145 :     (case OS.IO.pollDesc d of
146 :     NONE => bad ()
147 :     | SOME pd => let
148 :     val pd = OS.IO.pollIn pd
149 :     val fis' = TextIO.StreamIO.mkInstream (r, v)
150 :     val c = ref (Waiting [])
151 :     in
152 :     inputs := (c, pd) :: !inputs;
153 :     TextIO.setInstream (iis, fis');
154 :     c
155 :     end)
156 :     | _ => bad ()
157 :     end
158 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0