Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/compile/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/compile/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 454 - (view) (download)

1 : blume 448 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 : blume 449 * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 : blume 448 * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 :     type 'a cond (* condition with value *)
21 :    
22 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value *)
23 :     val wait : 'a cond -> 'a
24 : blume 454 val wait' : int -> 'a cond -> 'a
25 : blume 448 val inputReady : TextIO.instream -> unit cond
26 :     val ucond : unit -> unit cond
27 :     val signal : unit cond -> unit
28 : blume 451
29 :     (* forget all waiting threads and input conditions *)
30 :     val reset : unit -> unit
31 : blume 448 end
32 :    
33 :     structure Concur :> CONCUR = struct
34 :    
35 : blume 454 type tstate = unit SMLofNJ.Cont.cont * int
36 : blume 448
37 :     datatype 'a cstate =
38 :     Arrived of 'a (* value *)
39 :     | Waiting of tstate list (* waiting threads *)
40 :    
41 :     type 'a cond = 'a cstate ref
42 :    
43 : blume 454 (* simple and brain-dead priority queue *)
44 :     type task_queue = tstate list ref
45 : blume 448
46 : blume 454 fun enqueue (x as (_, xu), qr as ref q) = let
47 :     fun insert [] = [x]
48 :     | insert ((h as (_, hu)) :: r) =
49 :     if xu > hu then x :: h :: r else h :: insert r
50 :     in
51 :     qr := insert q
52 :     end
53 : blume 448
54 : blume 454 fun dequeue (ref []) = NONE
55 :     | dequeue (qr as ref (h :: t)) = (qr := t; SOME h)
56 : blume 448
57 : blume 454 val runable : task_queue = ref []
58 : blume 448 val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
59 :    
60 : blume 454 fun reset () = (runable := []; inputs := [])
61 : blume 451
62 : blume 448 (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
63 :    
64 :     fun wakeup (ref (Arrived _), _) =
65 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
66 :     | wakeup (r as ref (Waiting tsl), v) =
67 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
68 :    
69 :     fun ucond () = (ref (Waiting [])) : unit cond
70 :     fun signal (ref (Arrived ())) = ()
71 :     | signal uc = wakeup (uc, ())
72 :    
73 :     fun schedule_inputs () =
74 :     case !inputs of
75 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
76 :     | il => let
77 :     val dl = map #2 il
78 :     (* since nothing else is there to do we can afford to wait *)
79 :     val pil = OS.IO.poll (dl, NONE)
80 : blume 449 fun isReady (_, pd) = let
81 :     val pd_iod = OS.IO.pollToIODesc pd
82 :     fun sameIod pi =
83 :     OS.IO.compare (pd_iod,
84 :     OS.IO.pollToIODesc
85 :     (OS.IO.infoToPollDesc pi)) = EQUAL
86 :     in
87 :     List.exists sameIod pil
88 :     end
89 :     val (ready, notready) = List.partition isReady il
90 : blume 448 in
91 : blume 449 inputs := notready;
92 :     app (fn (c, _) => wakeup (c, ())) ready;
93 : blume 448 (* try to schedule again; if this fails it's bad *)
94 :     case dequeue runable of
95 :     NONE =>
96 :     (Say.say
97 :     ["schedule_inputs failed to wake anybody up!\n"];
98 :     raise Fail "concur")
99 : blume 454 | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
100 : blume 448 end
101 :    
102 :     fun schedule () =
103 :     case dequeue runable of
104 :     NONE => schedule_inputs ()
105 : blume 454 | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
106 : blume 448
107 : blume 454 fun wait' _ (ref (Arrived x)) = x
108 :     | wait' u (c as ref (Waiting tsl)) =
109 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting ((ts, u) :: tsl);
110 : blume 448 schedule ()));
111 : blume 454 wait' u c)
112 : blume 448
113 : blume 454 fun wait c = wait' 0 c
114 :    
115 : blume 448 fun fork worker = let
116 :     val c = ref (Waiting [])
117 :     in
118 :     SMLofNJ.Cont.callcc (fn return =>
119 : blume 454 (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, 0), runable);
120 : blume 448 SMLofNJ.Cont.throw return c));
121 :     wakeup (c, worker ());
122 :     schedule ()))
123 :     end
124 :    
125 :     fun inputReady iis = let
126 :     val fis = TextIO.getInstream iis
127 :     val (r, v) = TextIO.StreamIO.getReader fis
128 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
129 :     raise Fail "concur")
130 :     in
131 :     case r of
132 :     TextPrimIO.RD { ioDesc = SOME d, ... } =>
133 :     (case OS.IO.pollDesc d of
134 :     NONE => bad ()
135 :     | SOME pd => let
136 :     val pd = OS.IO.pollIn pd
137 :     val fis' = TextIO.StreamIO.mkInstream (r, v)
138 :     val c = ref (Waiting [])
139 :     in
140 :     inputs := (c, pd) :: !inputs;
141 :     TextIO.setInstream (iis, fis');
142 :     c
143 :     end)
144 :     | _ => bad ()
145 :     end
146 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0