Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/compile/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/compile/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 449 - (view) (download)

1 : blume 448 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 : blume 449 * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 : blume 448 * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 :     type 'a cond (* condition with value *)
21 :    
22 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value *)
23 :     val wait : 'a cond -> 'a
24 :     val inputReady : TextIO.instream -> unit cond
25 :     val ucond : unit -> unit cond
26 :     val signal : unit cond -> unit
27 :     end
28 :    
29 :     structure Concur :> CONCUR = struct
30 :    
31 :     type tstate = unit SMLofNJ.Cont.cont
32 :    
33 :     datatype 'a cstate =
34 :     Arrived of 'a (* value *)
35 :     | Waiting of tstate list (* waiting threads *)
36 :    
37 :     type 'a cond = 'a cstate ref
38 :    
39 :     type 'a queue = ('a list * 'a list) ref
40 :    
41 :     fun enqueue (x, q as ref (front, back)) = q := (front, x :: back)
42 :    
43 :     fun dequeue (ref ([], [])) = NONE
44 :     | dequeue (q as ref (x :: front, back)) = (q := (front, back); SOME x)
45 :     | dequeue (q as ref ([], back)) = (q := (rev back, []); dequeue q)
46 :    
47 :     val runable : tstate queue = ref ([], [])
48 :     val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
49 :    
50 :     (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
51 :    
52 :     fun wakeup (ref (Arrived _), _) =
53 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
54 :     | wakeup (r as ref (Waiting tsl), v) =
55 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
56 :    
57 :     fun ucond () = (ref (Waiting [])) : unit cond
58 :     fun signal (ref (Arrived ())) = ()
59 :     | signal uc = wakeup (uc, ())
60 :    
61 :     fun schedule_inputs () =
62 :     case !inputs of
63 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
64 :     | il => let
65 :     val dl = map #2 il
66 :     (* since nothing else is there to do we can afford to wait *)
67 :     val pil = OS.IO.poll (dl, NONE)
68 : blume 449 fun isReady (_, pd) = let
69 :     val pd_iod = OS.IO.pollToIODesc pd
70 :     fun sameIod pi =
71 :     OS.IO.compare (pd_iod,
72 :     OS.IO.pollToIODesc
73 :     (OS.IO.infoToPollDesc pi)) = EQUAL
74 :     in
75 :     List.exists sameIod pil
76 :     end
77 :     val (ready, notready) = List.partition isReady il
78 : blume 448 in
79 : blume 449 inputs := notready;
80 :     app (fn (c, _) => wakeup (c, ())) ready;
81 : blume 448 (* try to schedule again; if this fails it's bad *)
82 :     case dequeue runable of
83 :     NONE =>
84 :     (Say.say
85 :     ["schedule_inputs failed to wake anybody up!\n"];
86 :     raise Fail "concur")
87 :     | SOME ts => SMLofNJ.Cont.throw ts ()
88 :     end
89 :    
90 :     fun schedule () =
91 :     case dequeue runable of
92 :     NONE => schedule_inputs ()
93 :     | SOME ts => SMLofNJ.Cont.throw ts ()
94 :    
95 :     fun wait (ref (Arrived x)) = x
96 :     | wait (c as ref (Waiting tsl)) =
97 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting (ts :: tsl);
98 :     schedule ()));
99 :     wait c)
100 :    
101 :     fun fork worker = let
102 :     val c = ref (Waiting [])
103 :     in
104 :     SMLofNJ.Cont.callcc (fn return =>
105 :     (SMLofNJ.Cont.callcc (fn ts => (enqueue (ts, runable);
106 :     SMLofNJ.Cont.throw return c));
107 :     wakeup (c, worker ());
108 :     schedule ()))
109 :     end
110 :    
111 :     fun inputReady iis = let
112 :     val fis = TextIO.getInstream iis
113 :     val (r, v) = TextIO.StreamIO.getReader fis
114 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
115 :     raise Fail "concur")
116 :     in
117 :     case r of
118 :     TextPrimIO.RD { ioDesc = SOME d, ... } =>
119 :     (case OS.IO.pollDesc d of
120 :     NONE => bad ()
121 :     | SOME pd => let
122 :     val pd = OS.IO.pollIn pd
123 :     val fis' = TextIO.StreamIO.mkInstream (r, v)
124 :     val c = ref (Waiting [])
125 :     in
126 :     inputs := (c, pd) :: !inputs;
127 :     TextIO.setInstream (iis, fis');
128 :     c
129 :     end)
130 :     | _ => bad ()
131 :     end
132 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0