Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/benchmarks/todo/cml-sieve/concur.sml
ViewVC logotype

Annotation of /sml/trunk/benchmarks/todo/cml-sieve/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 193 - (view) (download)

1 : monnier 193 (* concur.sml
2 :     *
3 :     * COPYRIGHT (c) 1990 by John H. Reppy. See COPYRIGHT file for details.
4 :     *)
5 :    
6 :     functor ConcurML () : INTERNAL_CML =
7 :     struct
8 :    
9 :     (* we must use the fully polymorphic versions of callcc, etc. *)
10 :     open SMLofNJ.Cont
11 :    
12 :     val versionName = "Concurrent ML, version 0.9.6, October 11, 1991"
13 :    
14 :     (* some utility functions that should be inlined *)
15 :     fun reverse ([], rl) = rl
16 :     | reverse (x :: rest, rl) = reverse(rest, x :: rl)
17 :     fun op o (f, g) = fn x => f(g x)
18 :     fun map f l= let
19 :     fun map' ([], l) = reverse(l, [])
20 :     | map' (x::r, l) = map'(r, (f x)::l)
21 :     in
22 :     map' (l, [])
23 :     end
24 :     fun revmap f l= let
25 :     fun map' ([], l) = l
26 :     | map' (x::r, l) = map'(r, (f x)::l)
27 :     in
28 :     map' (reverse(l, []), [])
29 :     end
30 :     fun a @ [] = a
31 :     | a @ b = let
32 :     fun append ([], l) = reverse(l, b)
33 :     | append (x::r, l) = append(r, x::l)
34 :     in
35 :     append(a, [])
36 :     end
37 :     fun op before (x, y) = x
38 :    
39 :     (* queues *)
40 :     datatype 'a queue_t = Q of {front : 'a list ref, rear : 'a list ref}
41 :     (* create a new queue *)
42 :     fun queueNew () = Q{front = ref [], rear = ref []}
43 :     (* queue insert *)
44 :     fun queueInsc (Q{rear, ...}) = fn x => (rear := x :: !rear)
45 :     fun queueIns ((Q{rear, ...}), x) = (rear := x :: !rear)
46 :     (* remove the head of the queue *)
47 :     exception EmptyQ
48 :     fun queueRem (Q{front = ref [], rear = ref []}) = raise EmptyQ
49 :     | queueRem (Q{front = front as (ref []), rear = rear as (ref l)}) = let
50 :     val (x::r) = reverse(l, [])
51 :     in
52 :     front := r; rear := []; x
53 :     end
54 :     | queueRem (Q{front = front as (ref(x::r)), ...}) = (front := r; x)
55 :    
56 :     (* report an internal error to std_err *)
57 :     fun reportError msg = let
58 :     val s = if (ord(msg) = ord("\n"))
59 :     then concat["\nCML: ", substring(msg, 1, (size msg)-1), "\n"]
60 :     else concat["CML: ", msg, "\n"]
61 :     in
62 :     System.Signals.maskSignals true;
63 :     IO.output(IO.std_err, s);
64 :     System.Signals.maskSignals false
65 :     end
66 :    
67 :     exception InternalError
68 :     fun error s = (reportError("\nINTERNAL ERROR: "^s); raise InternalError)
69 :    
70 :     (* timers *)
71 :     structure T : sig
72 :     val earlier : Time.time * Time.time -> bool
73 :     val add_time : Time.time * Time.time -> time
74 :     val zeroTime : Time.time
75 :     val currentTime : unit -> Time.time
76 :     val timerOff : unit -> unit
77 :     val timerOn : Time.time option -> unit
78 :     val restartTimer : unit -> unit
79 :     end = struct
80 :     val earlier = Time.<
81 :     val add_time = Time.+
82 :     val zeroTime = Time.zeroTime
83 :     fun currentTime () = Time.now
84 :     val saveTime = ref (NONE : time option)
85 :     fun timerOff () = setitimer (0, zeroTime, zeroTime)
86 :     fun timerOn t = (
87 :     saveTime := t;
88 :     case t of (SOME tq) => setitimer (0, tq, tq) | _ => ())
89 :     fun restartTimer () =
90 :     case !saveTime of (SOME tq) => setitimer (0, tq, tq) | _ => ()
91 :     end
92 :     open T
93 :    
94 :    
95 :     (* the termination function *)
96 :     val shutdown = ref (fn () => ())
97 :    
98 :    
99 :     (* Per-thread descriptors *)
100 :     datatype thread_id = TID of { (* thread ids *)
101 :     id : int,
102 :     death_cond : unit cond_var
103 :     }
104 :     (* condition variables *)
105 :     and 'a cond_var = COND of 'a cond_state ref
106 :     and 'a cond_state
107 :     = COND_unset of (thread_id * bool ref * 'a cont) list
108 :     | COND_set of 'a
109 :    
110 :     (* channels *)
111 :     datatype 'a chan = CHAN of {
112 :     inq : (thread_id * 'a cont) chanq,
113 :     outq : (thread_id * 'a * unit cont) chanq
114 :     }
115 :     withtype 'a chanq = (bool ref * 'a) queue_t
116 :    
117 :     (* events *)
118 :     datatype abort_fn = NO_ABORT | ABORT of (unit -> unit)
119 :     datatype 'a base_evt = BASE_EVT of {
120 :     pollfn : unit -> bool,
121 :     dofn : abort_fn -> 'a,
122 :     blockfn : (bool ref * abort_fn * (unit -> unit)) -> 'a,
123 :     abortfn : abort_fn
124 :     }
125 :     datatype 'a event
126 :     = EVT of ('a base_evt list * bool) (* the boolean is true if one of the *)
127 :     (* base events has an abort action *)
128 :     | GUARD of (unit -> 'a event)
129 :    
130 :     fun mkBaseEvt arg = EVT([BASE_EVT arg], false)
131 :    
132 :     fun applyAbortFn NO_ABORT = ()
133 :     | applyAbortFn (ABORT a) = a()
134 :    
135 :     (** Thread id creation **)
136 :     val nextId = ref 0
137 :     fun newId () = let val id = !nextId
138 :     in
139 :     nextId := id + 1;
140 :     TID{id = id, death_cond = COND(ref(COND_unset[]))}
141 :     end
142 :     (* the current thread is represented using the "var" register *)
143 :     val getCurThread : unit -> thread_id = System.Unsafe.getvar
144 :     val setCurThread : thread_id -> unit = System.Unsafe.setvar
145 :     val getTid = getCurThread
146 :    
147 :     (* the thread ready queue *)
148 :     val rdyQ : (thread_id * unit cont) queue_t = queueNew()
149 :     (* enqueue a ready thread *)
150 :     val enqueue = queueInsc rdyQ
151 :     (* enqueue the current thread *)
152 :     fun enqueueCurThread resume = (enqueue(getTid(), resume))
153 :     (* add the current thread to the ready queue, and return the next one *)
154 :     fun switchCurThread k = let val curP = (getTid(), k)
155 :     in
156 :     case rdyQ
157 :     of Q{front=ref [], rear=ref []} => curP
158 :     | Q{front=front as (ref []), rear} => let
159 :     val (x::r) = reverse(!rear, [curP])
160 :     in
161 :     front := r; rear := []; x
162 :     end
163 :     | Q{front=front as ref (x::r), rear} => (
164 :     front := r; rear := (curP :: !rear); x)
165 :     end
166 :    
167 :    
168 :     (** I/O wait queues **)
169 :     fun pollFDs (rdfds, wrfds, blocking) = let
170 :     val t = if blocking then NONE else (SOME zeroTime)
171 :     val (rd, wr, _) = System.Unsafe.SysIO.select(rdfds, wrfds, [], t)
172 :     in
173 :     (rd, wr)
174 :     end
175 :     (* The list of I/O wait events *)
176 :     datatype io_operation_t = IO_RD | IO_WR
177 :     type io_item = {
178 :     fd : int, (* the file descriptor *)
179 :     io_op : io_operation_t, (* the operation being waited for *)
180 :     kont : unit cont, (* the synchronization continuation and *)
181 :     id : thread_id, (* the id of the waiting thread *)
182 :     err_kont : unit cont, (* the error continuation of the thread *)
183 :     dirty : bool ref (* the dirty bit *)
184 :     }
185 :     val ioWaitList = ref ([] : io_item list)
186 :    
187 :     (* project the list of read fds and list of write fds from the I/O wait list. *)
188 :     fun projIO () = let
189 :     fun f ([] : io_item list, rd, wr) = (rd, wr)
190 :     | f ({dirty = ref true, ...}::r, rd, wr) = f(r, rd, wr)
191 :     | f ({io_op = IO_RD, fd, ...}::r, rd, wr) = f(r, fd::rd, wr)
192 :     | f ({io_op = IO_WR, fd, ...}::r, rd, wr) = f(r, rd, fd::wr)
193 :     in
194 :     f(!ioWaitList, [], [])
195 :     end
196 :    
197 :     (* check for available I/O operations *)
198 :     fun checkIO shouldBlock = (case projIO()
199 :     of ([], []) => ()
200 :     | (rd, wr) => (case pollFDs(rd, wr, shouldBlock)
201 :     of ([], []) => ()
202 :     | (rd, wr) => let
203 :     fun f ([], l) = l
204 :     | f (({dirty = ref true, ...} : io_item)::r, l) = f (r, l)
205 :     | f ((x as {io_op, fd, kont, id, dirty, ...})::r, l) = let
206 :     fun look [] = false
207 :     | look (x::r) = if (x = fd)
208 :     then (
209 :     enqueue(id, kont);
210 :     dirty := true;
211 :     true)
212 :     else (look r)
213 :     in
214 :     if (look(case io_op of IO_RD => rd | IO_WR => wr))
215 :     then f(r, l)
216 :     else f(r, x::l)
217 :     end
218 :     in
219 :     ioWaitList := f(!ioWaitList, [])
220 :     end
221 :     (* end case *))
222 :     (* end case *))
223 :     handle (System.Unsafe.CInterface.SystemCall _) => let
224 :     open System.Unsafe.SysIO
225 :     fun testDesc fd = (ftype(DESC fd); false) handle _ => true
226 :     fun findBadDescs ([], l) = l
227 :     | findBadDescs ((x as {fd, dirty, err_kont, id, ...} : io_item)::r, l) =
228 :     if (testDesc fd)
229 :     then (
230 :     enqueue(id, err_kont);
231 :     dirty := true;
232 :     findBadDescs (r, l))
233 :     else findBadDescs (r, x::l)
234 :     in
235 :     ioWaitList := findBadDescs(!ioWaitList, []);
236 :     checkIO shouldBlock
237 :     end
238 :    
239 :     (* return true if there is at least one clean I/O wait event on the list *)
240 :     fun waitingForIO () = let
241 :     fun f (l as (({dirty = ref true, ...}::r)) : io_item list) = (f r)
242 :     | f l = l
243 :     in
244 :     case f(!ioWaitList)
245 :     of [] => (ioWaitList := []; false)
246 :     | l => (ioWaitList := l; true)
247 :     end
248 :    
249 :    
250 :     (** Timer waiting queues **)
251 :     datatype time_wait_t = TIMEWAIT of {
252 :     wait_time : time,
253 :     id : thread_id,
254 :     kont : unit cont,
255 :     dirty : bool ref
256 :     }
257 :     val timeWaitList = ref ([] : time_wait_t list)
258 :    
259 :     (* insert a timeout event *)
260 :     fun insTimeWait (tim, id, k, flg) = let
261 :     val item = TIMEWAIT{wait_time=tim, id=id, kont=k, dirty=flg}
262 :     fun scan [] = [item]
263 :     | scan ((t as TIMEWAIT{dirty = ref true, ...})::r) = scan r
264 :     | scan (l as ((t as TIMEWAIT{wait_time, ...})::r)) =
265 :     if (earlier (tim, wait_time))
266 :     then (item::l)
267 :     else (t::(scan r))
268 :     in
269 :     timeWaitList := scan(!timeWaitList)
270 :     end
271 :    
272 :     (* schedule any threads waiting for times earlier than the current time. *)
273 :     fun remTimeWait () = let
274 :     val tim = currentTime()
275 :     fun scan [] = []
276 :     | scan (l as ((t as TIMEWAIT{dirty = ref true, ...})::r)) = scan r
277 :     | scan (l as ((t as TIMEWAIT{wait_time, id, kont, dirty})::r)) =
278 :     if earlier(tim, wait_time)
279 :     then l
280 :     else (enqueue(id, kont); dirty := true; scan r)
281 :     in
282 :     timeWaitList := scan(!timeWaitList)
283 :     end
284 :    
285 :     (* return true if there is at least one clean timeout event on the list *)
286 :     fun waitingForTimeout () = let
287 :     fun f (TIMEWAIT{dirty = ref true, ...}::r) = (f r)
288 :     | f l = l
289 :     in
290 :     case (f (!timeWaitList))
291 :     of [] => (timeWaitList := []; false)
292 :     | l => (timeWaitList := l; true)
293 :     end
294 :    
295 :    
296 :     (** Process scheduling (and atomic regions) **)
297 :    
298 :     (* test for blocked threads that could conceivably become unblocked *)
299 :     fun checkWaitingThreads () = (
300 :     case (!ioWaitList) of [] => () | _=> (checkIO false);
301 :     case (!timeWaitList) of [] => () | _=> remTimeWait ())
302 :    
303 :     (* atomic regions (just SIGALRM for now) *)
304 :     local
305 :     open System.Signals
306 :     datatype atomic_state = NonAtomic | Atomic | SignalPending
307 :     val atomicState = ref NonAtomic
308 :     fun inAtomicRegion () = (case !atomicState of NonAtomic => false | _ => true)
309 :     fun signalPending () = (case !atomicState of SignalPending => true | _ => false)
310 :     in
311 :     (* begin an atomic region *)
312 :     fun atomicBegin () = (atomicState := Atomic)
313 :     (* Switch control to a thread, while leaving the atomic region *)
314 :     fun atomicSwitchToThread (id, kont) = (
315 :     setCurThread id;
316 :     atomicState := NonAtomic;
317 :     throw kont ())
318 :     (* end an atomic region *)
319 :     fun atomicEnd () = if signalPending()
320 :     then (
321 :     checkWaitingThreads();
322 :     callcc (fn k => (atomicSwitchToThread (switchCurThread k))))
323 :     else atomicState := NonAtomic
324 :     (* dispatch a thread while exiting an atomic region *)
325 :     fun atomicDispatch () = let
326 :     val _ = if signalPending() then checkWaitingThreads() else ();
327 :     (* wait for I/O or delay when there are no ready threads. *)
328 :     fun waitForSomething () = (case (waitingForIO(), waitingForTimeout())
329 :     of (false, false) => (!shutdown)()
330 :     | (_, false) => (timerOff(); checkIO true; restartTimer())
331 :     | _ => (System.Signals.pause(); checkWaitingThreads()))
332 :     fun dequeue () = (case rdyQ
333 :     of (Q{front = ref [], rear = ref []}) => (waitForSomething(); dequeue())
334 :     | (Q{front = front as (ref []), rear = rear as (ref l)}) => let
335 :     val (x::r) = reverse(l, [])
336 :     in
337 :     front := r; rear := []; x
338 :     end
339 :     | (Q{front = front as (ref(x::r)), ...}) => (front := r; x))
340 :     in
341 :     atomicSwitchToThread (dequeue())
342 :     end
343 :     fun dispatch () = (atomicBegin(); atomicDispatch())
344 :     (* throw to a continuation while exiting an atomic region *)
345 :     fun atomicThrow (k, x)= if signalPending()
346 :     then (
347 :     checkWaitingThreads();
348 :     callcc (fn k1 => (atomicSwitchToThread(switchCurThread k1)));
349 :     throw k x)
350 :     else (
351 :     atomicState := NonAtomic;
352 :     throw k x)
353 :     (* initialize the atomic region support *)
354 :     fun initAtomic () = let
355 :     val checkIOKont = callcc (fn k1 => (
356 :     callcc (fn k2 => (throw k1 k2));
357 :     (* NOTE: we start in an atomic region *)
358 :     checkWaitingThreads(); atomicDispatch()))
359 :     fun alrm_handler (_, k) = if inAtomicRegion()
360 :     then (atomicState := SignalPending; k)
361 :     else (enqueueCurThread k; atomicBegin(); checkIOKont)
362 :     in
363 :     setHandler (SIGALRM, SOME alrm_handler);
364 :     atomicState := NonAtomic
365 :     end
366 :     end (* local *)
367 :    
368 :     (* return the # of threads created and the length of the ready queue. *)
369 :     fun load () = let
370 :     val _ = atomicBegin()
371 :     val Q{front, rear} = rdyQ
372 :     val res = (!nextId, List.length(!front) + List.length(!rear))
373 :     in
374 :     atomicEnd(); res
375 :     end
376 :    
377 :    
378 :     (** Condition variables **)
379 :     exception WriteTwice
380 :     fun condVar () = COND(ref(COND_unset[]))
381 :     fun writeVar (COND rc, x) = (
382 :     atomicBegin();
383 :     case (! rc)
384 :     of (COND_unset pl) => let
385 :     fun f [] = ()
386 :     | f ((_, ref true, _) :: r) = (f r)
387 :     | f ((id, flg, kont) :: r) = (
388 :     enqueue (id, callcc (fn k1 => (
389 :     callcc (fn k2 => throw k1 k2);
390 :     throw kont x)));
391 :     flg := true;
392 :     f r)
393 :     in
394 :     rc := (COND_set x);
395 :     f pl;
396 :     atomicEnd ()
397 :     end
398 :     | _ => (atomicEnd(); raise WriteTwice)
399 :     (* end case *))
400 :     fun addCondWaiter ([], w) = [w]
401 :     | addCondWaiter ((_, ref true, _)::r, w) = addCondWaiter (r, w)
402 :     | addCondWaiter (x::r, w) = x::(addCondWaiter(r, w))
403 :     fun readVar (COND rc) = (
404 :     atomicBegin();
405 :     case !rc
406 :     of (COND_set x) => (atomicEnd(); x)
407 :     | (COND_unset pl) => callcc (fn k => (
408 :     rc := COND_unset(addCondWaiter(pl, (getTid(), ref false, k)));
409 :     atomicDispatch()))
410 :     (* end case *))
411 :     fun readVarEvt (COND rc) = mkBaseEvt {
412 :     pollfn = fn () => (case !rc of (COND_set _) => true | _ => false),
413 :     dofn = fn abortfn => (
414 :     case !rc
415 :     of (COND_set x) => (atomicEnd(); applyAbortFn abortfn; x)
416 :     | _ => error "[readVarEvt.dofn]"),
417 :     blockfn = fn (dirty, abortfn, next) => let
418 :     fun block k = (case !rc
419 :     of (COND_unset pl) => (
420 :     rc := COND_unset(addCondWaiter(pl, (getTid(), dirty, k)));
421 :     next(); error "[readVarEvt]")
422 :     | _ => error "[readVarEvt.blockfn]")
423 :     in
424 :     case abortfn
425 :     of NO_ABORT => (callcc block)
426 :     | (ABORT f) => ((callcc block) before (f ()))
427 :     end,
428 :     abortfn = NO_ABORT
429 :     }
430 :    
431 :    
432 :     (** Channel operations **)
433 :     fun insert (q : 'a chanq, flg, item) = queueIns(q, (flg, item))
434 :     fun remove (q : 'a chanq) = let
435 :     val (flg, item) = queueRem q
436 :     in
437 :     flg := true; item
438 :     end
439 :    
440 :     (* Clean a channel of satisfied transactions. We do this incrementally to
441 :     * give an amortized constant cost. Basically we guarantee that the front
442 :     * of the queue will be unsatisfied. Return true if the resulting queue
443 :     * is non-empty.
444 :     *)
445 :     local
446 :     fun clean' [] = []
447 :     | clean' ((ref true, _) :: r) = clean' r
448 :     | clean' l = l
449 :     in
450 :     fun clean ((Q{front, rear}) : 'a chanq) = (case (front, rear)
451 :     of (ref [], ref []) => false
452 :     | (ref [], ref r) => (case clean'(reverse(r, []))
453 :     of [] => (rear := []; false)
454 :     | l => (front := l; rear := []; true))
455 :     | (ref f, ref r) => (case (clean' f)
456 :     of [] => (case clean'(reverse(r, []))
457 :     of [] => (front := []; rear := []; false)
458 :     | l => (front := l; rear := []; true))
459 :     | l => (front := l; true)))
460 :     fun cleanAndRemove ((Q{front, rear}) : 'a chanq) = (case (front, rear)
461 :     of (ref [], ref []) => NONE
462 :     | (ref [], ref r) => (case clean'(reverse(r, []))
463 :     of [] => (rear := []; NONE)
464 :     | ((flg, item)::rest) => (
465 :     front := rest; rear := []; flg := true; SOME item))
466 :     | (ref f, ref r) => (case (clean' f)
467 :     of [] => (case clean'(reverse(r, []))
468 :     of [] => (front := []; rear := []; NONE)
469 :     | ((flg, item)::rest) => (
470 :     front := rest; rear := []; flg := true; SOME item))
471 :     | ((flg, item)::rest) => (front := rest; flg := true; SOME item)))
472 :     end (* local *)
473 :    
474 :     (* remove any waiting threads from a channel's queues *)
475 :     fun resetChan (CHAN{inq=Q{front=f1, rear=r1}, outq=Q{front=f2, rear=r2}}) = (
476 :     f1 := []; r1 := []; f2 := []; r2 := [])
477 :    
478 :     (* channel : unit -> '1a chan *)
479 :     fun channel () = CHAN{inq = queueNew(), outq = queueNew()}
480 :    
481 :     (* sameChannel : ('a chan * 'a chan) -> bool *)
482 :     fun sameChannel (CHAN{inq=Q{front=f1, ...}, ...}, CHAN{inq=Q{front=f2, ...}, ...}) =
483 :     (f1 = f2)
484 :    
485 :     (* send : ('a chan * 'a) -> unit *)
486 :     fun send (CHAN{inq, outq}, msg) = callcc (fn send_k => (
487 :     atomicBegin();
488 :     case (cleanAndRemove inq)
489 :     of SOME(rid, rkont) => (
490 :     enqueueCurThread send_k;
491 :     setCurThread rid;
492 :     atomicThrow (rkont, msg))
493 :     | NONE => (
494 :     insert(outq, ref false, (getTid(), msg, send_k));
495 :     atomicDispatch())
496 :     (* end case *)))
497 :     fun sendc ch msg = send (ch, msg)
498 :    
499 :     (* accept : 'a chan -> 'a *)
500 :     fun accept (CHAN{inq, outq}) = callcc (fn accept_k => (
501 :     atomicBegin();
502 :     case (cleanAndRemove outq)
503 :     of SOME(sid, msg, skont) => (
504 :     enqueue (sid, skont);
505 :     atomicThrow (accept_k, msg))
506 :     | NONE => (
507 :     insert(inq, ref false, (getTid(), accept_k));
508 :     atomicDispatch())
509 :     (* end case *)))
510 :    
511 :     (* transmit : ('a chan * 'a) -> unit event *)
512 :     fun transmit (CHAN{inq, outq}, msg) = let
513 :     fun pollFn () = (clean inq)
514 :     fun doFn abortfn = let
515 :     val (rid, rkont) = remove inq
516 :     fun doit k = (
517 :     enqueueCurThread k;
518 :     setCurThread rid;
519 :     atomicThrow(rkont, msg))
520 :     in
521 :     case abortfn
522 :     of NO_ABORT => callcc doit
523 :     | (ABORT f) => (callcc doit; f())
524 :     end
525 :     fun blockFn (flg, abortfn, next) = let
526 :     fun block k = (
527 :     clean outq;
528 :     insert(outq, flg, (getTid(), msg, k));
529 :     next(); error "[transmit]")
530 :     in
531 :     case abortfn
532 :     of NO_ABORT => (callcc block)
533 :     | (ABORT f) => (callcc block; f())
534 :     end
535 :     in
536 :     mkBaseEvt {
537 :     pollfn = pollFn, dofn = doFn, blockfn = blockFn, abortfn = NO_ABORT
538 :     }
539 :     end
540 :     fun transmitc ch msg = transmit (ch, msg)
541 :    
542 :     (* receive : 'a chan -> 'a event *)
543 :     fun receive (CHAN{inq, outq}) = let
544 :     fun pollFn () = (clean outq)
545 :     fun doFn abortfn = let
546 :     val (sid, msg, skont) = remove outq
547 :     in
548 :     enqueue (sid, skont);
549 :     atomicEnd ();
550 :     applyAbortFn abortfn;
551 :     msg
552 :     end
553 :     fun blockFn (flg, abortfn, next) = let
554 :     fun block k = (
555 :     clean inq;
556 :     insert(inq, flg, (getTid(), k));
557 :     next(); error "[receive]")
558 :     in
559 :     case abortfn
560 :     of NO_ABORT => (callcc block)
561 :     | (ABORT f) => ((callcc block) before (f ()))
562 :     end
563 :     in
564 :     mkBaseEvt {
565 :     pollfn = pollFn, dofn = doFn, blockfn = blockFn, abortfn = NO_ABORT
566 :     }
567 :     end
568 :    
569 :     (* A channel to pass errors on to an error monitor *)
570 :     val errCh : (thread_id * exn) chan = channel()
571 :    
572 :    
573 :     (** Thread operations **)
574 :     fun notify () = let
575 :     val (TID{id, death_cond, ...}) = getTid()
576 :     in
577 :     writeVar (death_cond, ())
578 :     end
579 :     fun spawnc f x = let
580 :     val _ = atomicBegin()
581 :     val id = newId()
582 :     in
583 :     callcc (fn parent_k => (
584 :     enqueueCurThread parent_k;
585 :     setCurThread id;
586 :     atomicEnd();
587 :     (f x; notify())
588 :     handle ex => (notify (); send (errCh, ((getTid()), ex)));
589 :     dispatch ()));
590 :     id
591 :     end
592 :     fun spawn f = spawnc f ()
593 :    
594 :     fun fastSpawn f = let
595 :     val _ = atomicBegin()
596 :     val id = newId()
597 :     in
598 :     callcc (fn k => (
599 :     callcc (fn k' => (enqueue (id, k'); atomicThrow (k, ())));
600 :     (f (); notify())
601 :     handle ex => (notify (); send (errCh, (getTid(), ex)));
602 :     dispatch ()))
603 :     end
604 :     val spawnList = app fastSpawn
605 :    
606 :     (* terminate the current thread *)
607 :     fun exit () = (notify(); dispatch())
608 :    
609 :     fun threadWait (TID{death_cond, ...}) = readVarEvt death_cond
610 :    
611 :     (* test thread_ids for equality *)
612 :     fun sameThread (TID{id=p1, ...}, TID{id=p2, ...}) = (p1 = p2)
613 :     (* test the order of thread ids *)
614 :     fun tidLessThan (TID{id=p1, ...}, TID{id=p2, ...}) = (p1 < p2)
615 :     (* return a string representation of a thread id *)
616 :     fun tidToString (TID{id, ...}) = concat ["[", makestring id, "]"]
617 :    
618 :     (* yield control to the next thread *)
619 :     fun yield () = callcc (fn k => (
620 :     atomicBegin(); enqueueCurThread k; atomicDispatch()))
621 :    
622 :    
623 :     (** Event operations **)
624 :     fun dummyFn _ = error "[dummyFn]"
625 :    
626 :     (* always : 'a -> 'a event *)
627 :     fun always x = mkBaseEvt {
628 :     pollfn = (fn () => true),
629 :     dofn = (fn abort => (applyAbortFn abort; x)),
630 :     blockfn = dummyFn,
631 :     abortfn = NO_ABORT
632 :     }
633 :     val ALWAYS = always ()
634 :    
635 :     (* 'a event list -> 'a event *)
636 :     fun choose l = let
637 :     fun f ([], el, [], flg) = EVT(el, flg)
638 :     (* note that the guard list gl is in reverse order *)
639 :     | f ([], el, gl, flg) =
640 :     GUARD(fn () =>
641 :     choose (EVT(el, flg) :: (revmap (fn g => (g ())) gl)))
642 :     | f (EVT(el', false) :: r, el, gl, hasAbort) =
643 :     f (r, el' @ el, gl, hasAbort)
644 :     | f (EVT(el', true) :: r, el, gl, _) = f (r, el' @ el, gl, true)
645 :     | f ((GUARD g)::r, el, gl, hasAbort) = f (r, el, g::gl, hasAbort)
646 :     in
647 :     f (l, [], [], false)
648 :     end
649 :    
650 :     (* guard : (unit -> 'a event) -> 'a event *)
651 :     val guard = GUARD
652 :    
653 :     (* wrap : ('a event * ('a -> 'b)) -> 'b event *)
654 :     fun wrap (GUARD g, f) = GUARD(fn () => wrap(g (), f))
655 :     | wrap (EVT(el, flg), f) = let
656 :     fun wrapEvts ([], l) = l
657 :     | wrapEvts ((BASE_EVT{pollfn, dofn, blockfn, abortfn})::r, l) = let
658 :     val bev = BASE_EVT{
659 :     pollfn = pollfn,
660 :     dofn = (f o dofn),
661 :     blockfn = (f o blockfn),
662 :     abortfn = abortfn
663 :     }
664 :     in
665 :     wrapEvts(r, bev::l)
666 :     end
667 :     in
668 :     EVT(wrapEvts (el, []), flg)
669 :     end
670 :    
671 :     (* wrapHandler : ('a event * (exn -> 'a)) -> 'a event *)
672 :     fun wrapHandler (GUARD g, h) =GUARD(fn () => wrapHandler(g (), h))
673 :     | wrapHandler (EVT(el, flg), h) = let
674 :     fun wh f x = (f x) handle e => (h e)
675 :     fun wrapEvts ([], l) = l
676 :     | wrapEvts ((BASE_EVT{pollfn, dofn, blockfn, abortfn})::r, l) = let
677 :     val bev = BASE_EVT{
678 :     pollfn = pollfn,
679 :     dofn = (wh dofn),
680 :     blockfn = (wh blockfn),
681 :     abortfn = abortfn
682 :     }
683 :     in
684 :     wrapEvts(r, bev::l)
685 :     end
686 :     in
687 :     EVT(wrapEvts (el, []), flg)
688 :     end
689 :    
690 :     (* wrapAbort : (a event * (unit -> unit)) -> 'a event *)
691 :     fun wrapAbort (GUARD g, abort) = GUARD(fn () => wrapAbort (g (), abort))
692 :     | wrapAbort (EVT(el, flg), abort) = let
693 :     fun addAbortFn (BASE_EVT{pollfn, dofn, blockfn, abortfn}, abort) = BASE_EVT{
694 :     pollfn = pollfn,
695 :     dofn = dofn,
696 :     blockfn = blockfn,
697 :     abortfn = (case abortfn
698 :     of NO_ABORT => ABORT abort
699 :     | (ABORT a) => ABORT(fn () => (fastSpawn abort; a())))
700 :     }
701 :     in
702 :     case el
703 :     of [] => EVT([], false)
704 :     | [bev] => EVT([addAbortFn (bev, abort)], true)
705 :     | (leader :: followers) => let
706 :     val n = length followers
707 :     in
708 :     GUARD (fn () => let
709 :     val ackCh = channel()
710 :     fun followerAbort () = send(ackCh, ())
711 :     fun leaderAbort 0 = abort()
712 :     | leaderAbort i = (accept ackCh; leaderAbort(i-1))
713 :     in
714 :     EVT(
715 :     addAbortFn(leader, fn () => (leaderAbort n)) ::
716 :     (map (fn b => addAbortFn(b, followerAbort)) followers),
717 :     true)
718 :     end)
719 :     end
720 :     end
721 :    
722 :     (** Sync and poll **)
723 :     local
724 :     (* Generate index numbers for "non-deterministic" selection. We use a
725 :     * round-robin style policy. *)
726 :     val cnt = ref 0
727 :     fun random 1 = 0
728 :     | random i = let val j = !cnt
729 :     in
730 :     cnt := Bits.andb(j+1, 0x7fff); (j rem i)
731 :     end
732 :     fun selectDoFn (el, n) = let
733 :     fun sel (f::_, 0) = f NO_ABORT
734 :     | sel (_::r, n) = sel (r, n-1)
735 :     in
736 :     sel (el, random n)
737 :     end
738 :    
739 :     fun syncOnEvts ([], _) = exit()
740 :     | syncOnEvts ([BASE_EVT{pollfn, dofn, blockfn, ...}], _) = (
741 :     atomicBegin();
742 :     if (pollfn ())
743 :     then dofn NO_ABORT
744 :     else blockfn (ref false, NO_ABORT, atomicDispatch))
745 :     | syncOnEvts (bevs, false) = let
746 :     fun ext ([], blockFns) = capture (fn k => let
747 :     val escape = escape k
748 :     val dirtyFlg = ref false
749 :     fun log [] = atomicDispatch ()
750 :     | log (blockfn :: r) = escape (
751 :     blockfn (dirtyFlg, NO_ABORT, fn () => log r))
752 :     in
753 :     log blockFns; error "[log]"
754 :     end)
755 :     | ext (BASE_EVT{pollfn, dofn, blockfn, ...} :: r, blockFns) =
756 :     if (pollfn ())
757 :     then extRdy (r, [dofn], 1)
758 :     else ext (r, blockfn::blockFns)
759 :     and extRdy ([], doFns, n) = selectDoFn (doFns, n)
760 :     | extRdy (BASE_EVT{pollfn, dofn, ...} :: r, doFns, n) =
761 :     if (pollfn ())
762 :     then extRdy (r, dofn::doFns, n+1)
763 :     else extRdy (r, doFns, n)
764 :     in
765 :     atomicBegin();
766 :     ext (bevs, [])
767 :     end
768 :     | syncOnEvts (bevs, true) = let
769 :     datatype 'a bevt_status
770 :     = BLK of 'a block_fn_t
771 :     | BLK_ABORT of ('a block_fn_t * (unit -> unit))
772 :     | RDY of (abort_fn -> 'a)
773 :     | RDY_ABORT of ((abort_fn -> 'a) * (unit -> unit))
774 :     withtype 'a block_fn_t = (bool ref * abort_fn * (unit -> unit)) -> 'a
775 :     fun ext ([], sts) = let
776 :     fun projAbortFns [] = []
777 :     | projAbortFns (BLK_ABORT(_, abort)::r) = abort :: (projAbortFns r)
778 :     | projAbortFns (_::r) = projAbortFns r
779 :     val abortFns = projAbortFns sts
780 :     val allAborts = ABORT(fn () => spawnList abortFns)
781 :     fun mkAbortFn i = let
782 :     fun abort ([], _) = ()
783 :     | abort (a::r, j) = (
784 :     if (i <> j) then (fastSpawn a; ()) else ();
785 :     abort (r, j+1))
786 :     in
787 :     ABORT(fn () => abort(abortFns, 0))
788 :     end
789 :     in
790 :     capture (fn k => let
791 :     val escape = escape k
792 :     val dirtyFlg = ref false
793 :     fun log ([], _) = atomicDispatch ()
794 :     | log ((BLK bfn) :: r, i) = escape (
795 :     bfn (dirtyFlg, allAborts,
796 :     fn () => (log(r, i); error "[log]")))
797 :     | log ((BLK_ABORT(bfn, _)) :: r, i) = escape (
798 :     bfn (dirtyFlg, mkAbortFn i,
799 :     fn () => (log(r, i+1); error "[log]")))
800 :     | log _ = error "[log]"
801 :     in
802 :     log (sts, 0)
803 :     end)
804 :     end
805 :     | ext (BASE_EVT{pollfn, dofn, blockfn, abortfn} :: r, sts) = (
806 :     case (pollfn(), abortfn)
807 :     of (false, NO_ABORT) => ext (r, (BLK blockfn)::sts)
808 :     | (false, ABORT a) => ext (r, (BLK_ABORT(blockfn, a))::sts)
809 :     | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, 1)
810 :     | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, 1)
811 :     (* end case *))
812 :     and extRdy ([], sts, nRdy) = let
813 :     fun selAndAbortRest ([], _, _) = error "[selAndAbortRest]"
814 :     | selAndAbortRest ((BLK _)::r, i, abortFns) =
815 :     selAndAbortRest (r, i, abortFns)
816 :     | selAndAbortRest ((BLK_ABORT(_, abort))::r, i, abortFns) =
817 :     selAndAbortRest (r, i, abort::abortFns)
818 :     | selAndAbortRest ((RDY doFn)::r, 0, abortFns) =
819 :     abortRest (r, abortFns, doFn)
820 :     | selAndAbortRest ((RDY _)::r, i, abortFns) =
821 :     selAndAbortRest (r, i-1, abortFns)
822 :     | selAndAbortRest ((RDY_ABORT(doFn, _))::r, 0, abortFns) =
823 :     abortRest (r, abortFns, doFn)
824 :     | selAndAbortRest ((RDY_ABORT(_, abort))::r, i, abortFns) =
825 :     selAndAbortRest (r, i-1, abort::abortFns)
826 :     and abortRest ([], abortFns, doFn) =
827 :     doFn (ABORT(fn () => spawnList abortFns))
828 :     | abortRest ((BLK_ABORT(_, abort))::r, abortFns, doFn) =
829 :     abortRest (r, abort::abortFns, doFn)
830 :     | abortRest ((RDY_ABORT(_, abort))::r, abortFns, doFn) =
831 :     abortRest (r, abort::abortFns, doFn)
832 :     | abortRest (_::r, abortFns, doFn) =
833 :     abortRest (r, abortFns, doFn)
834 :     in
835 :     selAndAbortRest (sts, random nRdy, [])
836 :     end
837 :     | extRdy (BASE_EVT{pollfn, dofn, blockfn, abortfn} :: r, sts, nRdy) = (
838 :     case (pollfn(), abortfn)
839 :     of (false, NO_ABORT) => extRdy (r, sts, nRdy)
840 :     | (false, ABORT a) => extRdy (r, (BLK_ABORT(blockfn, a))::sts, nRdy)
841 :     | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, nRdy+1)
842 :     | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, nRdy+1)
843 :     (* end case *))
844 :     in
845 :     atomicBegin();
846 :     ext (bevs, [])
847 :     end
848 :    
849 :     fun pollEvts ([], _) = NONE
850 :     | pollEvts ([BASE_EVT{pollfn, dofn, ...}], _) = (
851 :     atomicBegin();
852 :     if (pollfn ()) then SOME(dofn NO_ABORT) else (atomicEnd(); NONE))
853 :     | pollEvts (bevs, false) = let
854 :     fun ext [] = (atomicEnd(); NONE)
855 :     | ext (BASE_EVT{pollfn, dofn, ...} :: r) =
856 :     if (pollfn ()) then extRdy(r, [dofn], 1) else ext r
857 :     and extRdy ([], doFns, n) = SOME(selectDoFn (doFns, n))
858 :     | extRdy (BASE_EVT{pollfn, dofn, ...} :: r, doFns, n) =
859 :     if (pollfn ())
860 :     then extRdy (r, dofn::doFns, n+1)
861 :     else extRdy (r, doFns, n)
862 :     in
863 :     atomicBegin();
864 :     ext bevs
865 :     end
866 :     | pollEvts (bevs, true) = let
867 :     datatype 'a bevt_status
868 :     = BLK_ABORT of (unit -> unit)
869 :     | RDY of (abort_fn -> 'a)
870 :     | RDY_ABORT of ((abort_fn -> 'a) * (unit -> unit))
871 :     fun doAndAbort (sts, i) = let
872 :     fun f ([], abortFns, _, NONE) = (spawnList abortFns; NONE)
873 :     | f ([], abortFns, _, SOME doFn) =
874 :     SOME (doFn (ABORT(fn () => spawnList abortFns)))
875 :     | f ((BLK_ABORT abort) :: r, abortFns, i, doFn) =
876 :     f (r, abort::abortFns, i, doFn)
877 :     | f ((RDY doFn) :: r, abortFns, 0, _) =
878 :     f (r, abortFns, i, SOME doFn)
879 :     | f ((RDY _) :: r, abortFns, i, doFn) =
880 :     f (r, abortFns, i-1, doFn)
881 :     | f ((RDY_ABORT(doFn, _)) :: r, abortFns, 0, _) =
882 :     f (r, abortFns, i, SOME doFn)
883 :     | f ((RDY_ABORT(_, abort)) :: r, abortFns, i, doFn) =
884 :     f (r, abort::abortFns, i, doFn)
885 :     in
886 :     f (sts, [], i, NONE)
887 :     end
888 :     fun ext ([], sts) = doAndAbort (sts, ~1)
889 :     | ext (BASE_EVT{pollfn, dofn, abortfn, ...} :: r, sts) = (
890 :     case (pollfn (), abortfn)
891 :     of (false, NO_ABORT) => ext (r, sts)
892 :     | (false, ABORT a) => ext (r, (BLK_ABORT a)::sts)
893 :     | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, 1)
894 :     | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, 1)
895 :     (* end case *))
896 :     and extRdy ([], sts, n) = doAndAbort (sts, random n)
897 :     | extRdy (BASE_EVT{pollfn, dofn, abortfn, ...} :: r, sts, n) = (
898 :     case (pollfn (), abortfn)
899 :     of (false, NO_ABORT) => extRdy (r, sts, n)
900 :     | (false, ABORT a) => extRdy (r, (BLK_ABORT a)::sts, n)
901 :     | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, n+1)
902 :     | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, n+1)
903 :     (* end case *))
904 :     in
905 :     atomicBegin();
906 :     ext (bevs, [])
907 :     end
908 :    
909 :     fun evalGuard (GUARD g) = evalGuard (g ())
910 :     | evalGuard (EVT arg) = arg
911 :     in
912 :     fun sync evt = syncOnEvts (evalGuard evt)
913 :     val select = sync o choose
914 :     fun poll evt = pollEvts (evalGuard evt)
915 :     end (* local *)
916 :    
917 :     (* waitUntil : time -> unit event *)
918 :     fun waitUntil (t as TIME{sec, usec}) = let
919 :     val curTime = currentTime()
920 :     in
921 :     if earlier(curTime, t)
922 :     then let
923 :     fun pollFn () = false
924 :     fun blockFn (flg, abort, next) = let
925 :     fun block k = (
926 :     insTimeWait(t, getTid(), k, flg);
927 :     next())
928 :     in
929 :     case abort
930 :     of NO_ABORT => (callcc block)
931 :     | (ABORT f) => (callcc block; f ())
932 :     end
933 :     in
934 :     mkBaseEvt {
935 :     pollfn = pollFn, dofn = dummyFn, blockfn = blockFn,
936 :     abortfn = NO_ABORT
937 :     }
938 :     end
939 :     else ALWAYS
940 :     end
941 :    
942 :     (* timeout : time -> unit event *)
943 :     fun timeout (t as TIME{sec, usec}) = if earlier(t, zeroTime)
944 :     then ALWAYS
945 :     else let
946 :     fun pollFn () = false
947 :     fun blockFn (flg, abort, next) = let
948 :     fun block k = (
949 :     insTimeWait(add_time (currentTime(), t), getTid(), k, flg);
950 :     next())
951 :     in
952 :     case abort
953 :     of NO_ABORT => (callcc block)
954 :     | (ABORT f) => (callcc block; f ())
955 :     end
956 :     in
957 :     mkBaseEvt {
958 :     pollfn = pollFn, dofn = dummyFn, blockfn = blockFn,
959 :     abortfn = NO_ABORT
960 :     }
961 :     end
962 :    
963 :    
964 :     (** Initialization **)
965 :    
966 :     (* initialize the internal queues, I/O waiting lists and counters *)
967 :     fun initQueues () = let val Q{front, rear} = rdyQ
968 :     in
969 :     front := []; rear := [];
970 :     ioWaitList := [];
971 :     timeWaitList := [];
972 :     nextId := 0;
973 :     setCurThread (newId());
974 :     initAtomic()
975 :     end
976 :    
977 :     end (* functor ConcurML *)

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0