SCM Repository
Annotation of /sml/trunk/src/cm/compile/concur.sml
Parent Directory
|
Revision Log
Revision 448 - (view) (download)
1 : | blume | 448 | (* |
2 : | * A very simple concurrency package (inspired by CML and the concept of | ||
3 : | * "futures", but much less powerful). | ||
4 : | * - uses no preemption | ||
5 : | * - thread gives up control by waiting on a condition | ||
6 : | * - conditions can signal thread termination and available input on some | ||
7 : | * text stream | ||
8 : | * - gives total priority to "local" computations | ||
9 : | * (meaning that all threads must get stuck before I/O is even checked) | ||
10 : | * (This is just here to utilize some external concurrency, i.e., OS | ||
11 : | * processes. The processes must synchronize themselves with the | ||
12 : | * controlling process via I/O.) | ||
13 : | * | ||
14 : | * (C) 1999 Lucent Technologies, Bell Laboratories | ||
15 : | * | ||
16 : | * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp) | ||
17 : | *) | ||
18 : | signature CONCUR = sig | ||
19 : | |||
20 : | type 'a cond (* condition with value *) | ||
21 : | |||
22 : | val fork : (unit -> 'a) -> 'a cond (* termination condition with value *) | ||
23 : | val wait : 'a cond -> 'a | ||
24 : | val inputReady : TextIO.instream -> unit cond | ||
25 : | val ucond : unit -> unit cond | ||
26 : | val signal : unit cond -> unit | ||
27 : | end | ||
28 : | |||
29 : | structure Concur :> CONCUR = struct | ||
30 : | |||
31 : | type tstate = unit SMLofNJ.Cont.cont | ||
32 : | |||
33 : | datatype 'a cstate = | ||
34 : | Arrived of 'a (* value *) | ||
35 : | | Waiting of tstate list (* waiting threads *) | ||
36 : | |||
37 : | type 'a cond = 'a cstate ref | ||
38 : | |||
39 : | type 'a queue = ('a list * 'a list) ref | ||
40 : | |||
41 : | fun enqueue (x, q as ref (front, back)) = q := (front, x :: back) | ||
42 : | |||
43 : | fun dequeue (ref ([], [])) = NONE | ||
44 : | | dequeue (q as ref (x :: front, back)) = (q := (front, back); SOME x) | ||
45 : | | dequeue (q as ref ([], back)) = (q := (rev back, []); dequeue q) | ||
46 : | |||
47 : | val runable : tstate queue = ref ([], []) | ||
48 : | val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list) | ||
49 : | |||
50 : | (* we heavily favor non-I/O conditions, but that's ok for our purposes *) | ||
51 : | |||
52 : | fun wakeup (ref (Arrived _), _) = | ||
53 : | (Say.say ["woken up twice!\n"]; raise Fail "concur") | ||
54 : | | wakeup (r as ref (Waiting tsl), v) = | ||
55 : | (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl) | ||
56 : | |||
57 : | fun ucond () = (ref (Waiting [])) : unit cond | ||
58 : | fun signal (ref (Arrived ())) = () | ||
59 : | | signal uc = wakeup (uc, ()) | ||
60 : | |||
61 : | fun schedule_inputs () = | ||
62 : | case !inputs of | ||
63 : | [] => (Say.say ["deadlock!\n"]; raise Fail "concur") | ||
64 : | | il => let | ||
65 : | val dl = map #2 il | ||
66 : | (* since nothing else is there to do we can afford to wait *) | ||
67 : | val pil = OS.IO.poll (dl, NONE) | ||
68 : | fun wakeup_ready (pi, (c, pd), il') = | ||
69 : | if OS.IO.isIn pi then (wakeup (c, ()); il') | ||
70 : | else (c, pd) :: il' | ||
71 : | val il' = ListPair.foldl wakeup_ready [] (pil, il) | ||
72 : | in | ||
73 : | inputs := il'; | ||
74 : | (* try to schedule again; if this fails it's bad *) | ||
75 : | case dequeue runable of | ||
76 : | NONE => | ||
77 : | (Say.say | ||
78 : | ["schedule_inputs failed to wake anybody up!\n"]; | ||
79 : | raise Fail "concur") | ||
80 : | | SOME ts => SMLofNJ.Cont.throw ts () | ||
81 : | end | ||
82 : | |||
83 : | fun schedule () = | ||
84 : | case dequeue runable of | ||
85 : | NONE => schedule_inputs () | ||
86 : | | SOME ts => SMLofNJ.Cont.throw ts () | ||
87 : | |||
88 : | fun wait (ref (Arrived x)) = x | ||
89 : | | wait (c as ref (Waiting tsl)) = | ||
90 : | (SMLofNJ.Cont.callcc (fn ts => (c := Waiting (ts :: tsl); | ||
91 : | schedule ())); | ||
92 : | wait c) | ||
93 : | |||
94 : | fun fork worker = let | ||
95 : | val c = ref (Waiting []) | ||
96 : | in | ||
97 : | SMLofNJ.Cont.callcc (fn return => | ||
98 : | (SMLofNJ.Cont.callcc (fn ts => (enqueue (ts, runable); | ||
99 : | SMLofNJ.Cont.throw return c)); | ||
100 : | wakeup (c, worker ()); | ||
101 : | schedule ())) | ||
102 : | end | ||
103 : | |||
104 : | fun inputReady iis = let | ||
105 : | val fis = TextIO.getInstream iis | ||
106 : | val (r, v) = TextIO.StreamIO.getReader fis | ||
107 : | fun bad () = (Say.say ["inputReady: bad stream\n"]; | ||
108 : | raise Fail "concur") | ||
109 : | in | ||
110 : | case r of | ||
111 : | TextPrimIO.RD { ioDesc = SOME d, ... } => | ||
112 : | (case OS.IO.pollDesc d of | ||
113 : | NONE => bad () | ||
114 : | | SOME pd => let | ||
115 : | val pd = OS.IO.pollIn pd | ||
116 : | val fis' = TextIO.StreamIO.mkInstream (r, v) | ||
117 : | val c = ref (Waiting []) | ||
118 : | in | ||
119 : | inputs := (c, pd) :: !inputs; | ||
120 : | TextIO.setInstream (iis, fis'); | ||
121 : | c | ||
122 : | end) | ||
123 : | | _ => bad () | ||
124 : | end | ||
125 : | end |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |