Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cm/compile/concur.sml
ViewVC logotype

Annotation of /sml/trunk/src/cm/compile/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 448 - (view) (download)

1 : blume 448 (*
2 :     * A very simple concurrency package (inspired by CML and the concept of
3 :     * "futures", but much less powerful).
4 :     * - uses no preemption
5 :     * - thread gives up control by waiting on a condition
6 :     * - conditions can signal thread termination and available input on some
7 :     * text stream
8 :     * - gives total priority to "local" computations
9 :     * (meaning that all threads must get stuck before I/O is even checked)
10 :     * (This is just here to utilize some external concurrency, i.e., OS
11 :     * processes. The processes must synchronize themselves with the
12 :     * controlling process via I/O.)
13 :     *
14 :     * (C) 1999 Lucent Technologies, Bell Laboratories
15 :     *
16 :     * Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
17 :     *)
18 :     signature CONCUR = sig
19 :    
20 :     type 'a cond (* condition with value *)
21 :    
22 :     val fork : (unit -> 'a) -> 'a cond (* termination condition with value *)
23 :     val wait : 'a cond -> 'a
24 :     val inputReady : TextIO.instream -> unit cond
25 :     val ucond : unit -> unit cond
26 :     val signal : unit cond -> unit
27 :     end
28 :    
29 :     structure Concur :> CONCUR = struct
30 :    
31 :     type tstate = unit SMLofNJ.Cont.cont
32 :    
33 :     datatype 'a cstate =
34 :     Arrived of 'a (* value *)
35 :     | Waiting of tstate list (* waiting threads *)
36 :    
37 :     type 'a cond = 'a cstate ref
38 :    
39 :     type 'a queue = ('a list * 'a list) ref
40 :    
41 :     fun enqueue (x, q as ref (front, back)) = q := (front, x :: back)
42 :    
43 :     fun dequeue (ref ([], [])) = NONE
44 :     | dequeue (q as ref (x :: front, back)) = (q := (front, back); SOME x)
45 :     | dequeue (q as ref ([], back)) = (q := (rev back, []); dequeue q)
46 :    
47 :     val runable : tstate queue = ref ([], [])
48 :     val inputs = ref ([]: (unit cond * OS.IO.poll_desc) list)
49 :    
50 :     (* we heavily favor non-I/O conditions, but that's ok for our purposes *)
51 :    
52 :     fun wakeup (ref (Arrived _), _) =
53 :     (Say.say ["woken up twice!\n"]; raise Fail "concur")
54 :     | wakeup (r as ref (Waiting tsl), v) =
55 :     (r := Arrived v; app (fn ts => enqueue (ts, runable)) tsl)
56 :    
57 :     fun ucond () = (ref (Waiting [])) : unit cond
58 :     fun signal (ref (Arrived ())) = ()
59 :     | signal uc = wakeup (uc, ())
60 :    
61 :     fun schedule_inputs () =
62 :     case !inputs of
63 :     [] => (Say.say ["deadlock!\n"]; raise Fail "concur")
64 :     | il => let
65 :     val dl = map #2 il
66 :     (* since nothing else is there to do we can afford to wait *)
67 :     val pil = OS.IO.poll (dl, NONE)
68 :     fun wakeup_ready (pi, (c, pd), il') =
69 :     if OS.IO.isIn pi then (wakeup (c, ()); il')
70 :     else (c, pd) :: il'
71 :     val il' = ListPair.foldl wakeup_ready [] (pil, il)
72 :     in
73 :     inputs := il';
74 :     (* try to schedule again; if this fails it's bad *)
75 :     case dequeue runable of
76 :     NONE =>
77 :     (Say.say
78 :     ["schedule_inputs failed to wake anybody up!\n"];
79 :     raise Fail "concur")
80 :     | SOME ts => SMLofNJ.Cont.throw ts ()
81 :     end
82 :    
83 :     fun schedule () =
84 :     case dequeue runable of
85 :     NONE => schedule_inputs ()
86 :     | SOME ts => SMLofNJ.Cont.throw ts ()
87 :    
88 :     fun wait (ref (Arrived x)) = x
89 :     | wait (c as ref (Waiting tsl)) =
90 :     (SMLofNJ.Cont.callcc (fn ts => (c := Waiting (ts :: tsl);
91 :     schedule ()));
92 :     wait c)
93 :    
94 :     fun fork worker = let
95 :     val c = ref (Waiting [])
96 :     in
97 :     SMLofNJ.Cont.callcc (fn return =>
98 :     (SMLofNJ.Cont.callcc (fn ts => (enqueue (ts, runable);
99 :     SMLofNJ.Cont.throw return c));
100 :     wakeup (c, worker ());
101 :     schedule ()))
102 :     end
103 :    
104 :     fun inputReady iis = let
105 :     val fis = TextIO.getInstream iis
106 :     val (r, v) = TextIO.StreamIO.getReader fis
107 :     fun bad () = (Say.say ["inputReady: bad stream\n"];
108 :     raise Fail "concur")
109 :     in
110 :     case r of
111 :     TextPrimIO.RD { ioDesc = SOME d, ... } =>
112 :     (case OS.IO.pollDesc d of
113 :     NONE => bad ()
114 :     | SOME pd => let
115 :     val pd = OS.IO.pollIn pd
116 :     val fis' = TextIO.StreamIO.mkInstream (r, v)
117 :     val c = ref (Waiting [])
118 :     in
119 :     inputs := (c, pd) :: !inputs;
120 :     TextIO.setInstream (iis, fis');
121 :     c
122 :     end)
123 :     | _ => bad ()
124 :     end
125 :     end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0