Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/concur/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/concur/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 484 - (view) (download)

1 : blume 464 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 :     * - conditions can signal thread termination, available input on some
7 :     * text stream, or on some explicitly signalled "unit" condition
8 :     * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 : blume 478 (* "private" is essentially the same as "unit", but we use it
21 :     * to make sure that only "pcond"-generated conditions are
22 :     * going to be explicitly signalled via "signal" *)
23 :     type private
24 :    
25 : blume 464 type 'a cond (* condition with value *)
26 :    
27 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value
28 :     * (thread initially waits with
29 :     * extremely low urgency) *)
30 :     val wait : 'a cond -> 'a (* wait with low urgency *)
31 :     val waitU : int -> 'a cond -> 'a (* wait with given urgency,
32 :     * (urgency is always higher than
33 :     * when waiting using "wait") *)
34 :    
35 :     val inputReady : TextIO.instream -> unit cond
36 :    
37 : blume 478 val pcond : unit -> private cond
38 :     val signal : private cond -> unit
39 :    
40 : blume 464 (* forget all waiting threads and input conditions *)
41 :     val reset : unit -> unit
42 :     end
43 :    
44 :     structure Concur :> CONCUR = struct
45 :    
46 : blume 478 type private = unit
47 :    
48 : blume 464 type tstate = unit SMLofNJ.Cont.cont * int
49 :    
50 :     datatype 'a cstate =
51 :     Arrived of 'a (* value *)
52 :     | Waiting of tstate list (* waiting threads *)
53 :    
54 :     type 'a cond = 'a cstate ref
55 :    
56 :     (* simple and brain-dead priority queue *)
57 :     type task_queue = tstate list ref
58 :    
59 :     fun enqueue (x as (_, xu), qr as ref q) = let
60 :     fun insert [] = [x]
61 :     | insert ((h as (_, hu)) :: r) =
62 :     if xu > hu then x :: h :: r else h :: insert r
63 :     in
64 :     qr := insert q
65 :     end
66 :    
67 :     fun dequeue (ref []) = NONE
68 :     | dequeue (qr as ref (h :: t)) = (qr := t; SOME h)
69 :    
70 :     val runable : task_queue = ref []
71 :     val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
72 :    
73 :     fun reset () = (runable := []; inputs := [])
74 :    
75 :     (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
76 :    
77 :     fun wakeup (ref (Arrived _), _) =
78 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
79 :     | wakeup (r as ref (Waiting tsl), v) =
80 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
81 :    
82 : blume 478 fun pcond () = (ref (Waiting [])) : private cond
83 : blume 464 fun signal (ref (Arrived ())) = ()
84 :     | signal uc = wakeup (uc, ())
85 :    
86 :     fun schedule_inputs () =
87 :     case !inputs of
88 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
89 :     | il => let
90 :     val dl = map #2 il
91 :     (* since nothing else is there to do we can afford to wait *)
92 :     val pil = OS.IO.poll (dl, NONE)
93 :     fun isReady (_, pd) = let
94 :     val pd_iod = OS.IO.pollToIODesc pd
95 :     fun sameIod pi =
96 :     OS.IO.compare (pd_iod,
97 :     OS.IO.pollToIODesc
98 :     (OS.IO.infoToPollDesc pi)) = EQUAL
99 :     in
100 :     List.exists sameIod pil
101 :     end
102 :     val (ready, notready) = List.partition isReady il
103 :     in
104 :     inputs := notready;
105 :     app (fn (c, _) => wakeup (c, ())) ready;
106 :     (* try to schedule again; if this fails it's bad *)
107 :     case dequeue runable of
108 :     NONE =>
109 :     (Say.say
110 :     ["schedule_inputs failed to wake anybody up!\n"];
111 :     raise Fail "concur")
112 :     | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
113 :     end
114 :    
115 :     fun schedule () =
116 :     case dequeue runable of
117 :     NONE => schedule_inputs ()
118 :     | SOME (ts, _) => SMLofNJ.Cont.throw ts ()
119 :    
120 :     fun wait' _ (ref (Arrived x)) = x
121 :     | wait' u (c as ref (Waiting tsl)) =
122 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting ((ts, u) :: tsl);
123 :     schedule ()));
124 :     wait' u c)
125 :    
126 :     fun wait c = wait' 0 c
127 :    
128 :     fun waitU u c = wait' (u + 1) c
129 :    
130 :     fun fork worker = let
131 :     val c = ref (Waiting [])
132 :     in
133 :     SMLofNJ.Cont.callcc (fn return =>
134 :     (SMLofNJ.Cont.callcc (fn ts => (enqueue ((ts, ~1), runable);
135 :     SMLofNJ.Cont.throw return c));
136 :     wakeup (c, worker ());
137 :     schedule ()))
138 :     end
139 :    
140 :     fun inputReady iis = let
141 :     val fis = TextIO.getInstream iis
142 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
143 :     raise Fail "concur")
144 : blume 484 val rv = TextIO.StreamIO.getReader fis
145 :     val c = case rv of
146 :     (TextPrimIO.RD { ioDesc = SOME d, ... }, "") =>
147 : blume 464 (case OS.IO.pollDesc d of
148 :     NONE => bad ()
149 :     | SOME pd => let
150 :     val c = ref (Waiting [])
151 :     in
152 : blume 484 inputs := (c, OS.IO.pollIn pd) :: !inputs;
153 : blume 464 c
154 :     end)
155 : blume 484 | (_, "") => bad ()
156 :     | rv => ref (Arrived ())
157 :     in
158 :     TextIO.setInstream (iis, TextIO.StreamIO.mkInstream rv);
159 :     c
160 : blume 464 end
161 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0