Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/compile/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/compile/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 451 - (view) (download)

1 : blume 448 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 : blume 449 * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 : blume 448 * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 :     type 'a cond (* condition with value *)
21 :    
22 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value *)
23 :     val wait : 'a cond -> 'a
24 :     val inputReady : TextIO.instream -> unit cond
25 :     val ucond : unit -> unit cond
26 :     val signal : unit cond -> unit
27 : blume 451
28 :     (* forget all waiting threads and input conditions *)
29 :     val reset : unit -> unit
30 : blume 448 end
31 :    
32 :     structure Concur :> CONCUR = struct
33 :    
34 :     type tstate = unit SMLofNJ.Cont.cont
35 :    
36 :     datatype 'a cstate =
37 :     Arrived of 'a (* value *)
38 :     | Waiting of tstate list (* waiting threads *)
39 :    
40 :     type 'a cond = 'a cstate ref
41 :    
42 :     type 'a queue = ('a list * 'a list) ref
43 :    
44 :     fun enqueue (x, q as ref (front, back)) = q := (front, x :: back)
45 :    
46 :     fun dequeue (ref ([], [])) = NONE
47 :     | dequeue (q as ref (x :: front, back)) = (q := (front, back); SOME x)
48 :     | dequeue (q as ref ([], back)) = (q := (rev back, []); dequeue q)
49 :    
50 :     val runable : tstate queue = ref ([], [])
51 :     val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
52 :    
53 : blume 451 fun reset () = (runable := ([], []); inputs := [])
54 :    
55 : blume 448 (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
56 :    
57 :     fun wakeup (ref (Arrived _), _) =
58 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
59 :     | wakeup (r as ref (Waiting tsl), v) =
60 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
61 :    
62 :     fun ucond () = (ref (Waiting [])) : unit cond
63 :     fun signal (ref (Arrived ())) = ()
64 :     | signal uc = wakeup (uc, ())
65 :    
66 :     fun schedule_inputs () =
67 :     case !inputs of
68 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
69 :     | il => let
70 :     val dl = map #2 il
71 :     (* since nothing else is there to do we can afford to wait *)
72 :     val pil = OS.IO.poll (dl, NONE)
73 : blume 449 fun isReady (_, pd) = let
74 :     val pd_iod = OS.IO.pollToIODesc pd
75 :     fun sameIod pi =
76 :     OS.IO.compare (pd_iod,
77 :     OS.IO.pollToIODesc
78 :     (OS.IO.infoToPollDesc pi)) = EQUAL
79 :     in
80 :     List.exists sameIod pil
81 :     end
82 :     val (ready, notready) = List.partition isReady il
83 : blume 448 in
84 : blume 449 inputs := notready;
85 :     app (fn (c, _) => wakeup (c, ())) ready;
86 : blume 448 (* try to schedule again; if this fails it's bad *)
87 :     case dequeue runable of
88 :     NONE =>
89 :     (Say.say
90 :     ["schedule_inputs failed to wake anybody up!\n"];
91 :     raise Fail "concur")
92 :     | SOME ts => SMLofNJ.Cont.throw ts ()
93 :     end
94 :    
95 :     fun schedule () =
96 :     case dequeue runable of
97 :     NONE => schedule_inputs ()
98 :     | SOME ts => SMLofNJ.Cont.throw ts ()
99 :    
100 :     fun wait (ref (Arrived x)) = x
101 :     | wait (c as ref (Waiting tsl)) =
102 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting (ts :: tsl);
103 :     schedule ()));
104 :     wait c)
105 :    
106 :     fun fork worker = let
107 :     val c = ref (Waiting [])
108 :     in
109 :     SMLofNJ.Cont.callcc (fn return =>
110 :     (SMLofNJ.Cont.callcc (fn ts => (enqueue (ts, runable);
111 :     SMLofNJ.Cont.throw return c));
112 :     wakeup (c, worker ());
113 :     schedule ()))
114 :     end
115 :    
116 :     fun inputReady iis = let
117 :     val fis = TextIO.getInstream iis
118 :     val (r, v) = TextIO.StreamIO.getReader fis
119 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
120 :     raise Fail "concur")
121 :     in
122 :     case r of
123 :     TextPrimIO.RD { ioDesc = SOME d, ... } =>
124 :     (case OS.IO.pollDesc d of
125 :     NONE => bad ()
126 :     | SOME pd => let
127 :     val pd = OS.IO.pollIn pd
128 :     val fis' = TextIO.StreamIO.mkInstream (r, v)
129 :     val c = ref (Waiting [])
130 :     in
131 :     inputs := (c, pd) :: !inputs;
132 :     TextIO.setInstream (iis, fis');
133 :     c
134 :     end)
135 :     | _ => bad ()
136 :     end
137 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0