Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /cml/trunk/src/core-cml/barrier.sml
ViewVC logotype

Annotation of /cml/trunk/src/core-cml/barrier.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3573 - (view) (download)

1 : jhr 3572 (* barrier.sml
2 :     *
3 :     * COPYRIGHT (c) 2011 The Fellowship of SML/NJ (http://www.smlnj.org)
4 :     * All rights reserved.
5 :     *)
6 :    
7 :     structure Barrier :> BARRIER =
8 :     struct
9 :    
10 :     structure S = Scheduler
11 :    
12 :     type 'a cont = 'a SMLofNJ.Cont.cont
13 :     val callcc = SMLofNJ.Cont.callcc
14 :     val throw = SMLofNJ.Cont.throw
15 :    
16 :     datatype 'a result = RAISE of exn | VALUE of 'a
17 :    
18 :     datatype 'a barrier = BAR of {
19 :     state : 'a ref,
20 :     update : 'a -> 'a,
21 :     nEnrolled : int ref,
22 :     nWaiting : int ref,
23 :     waiting : (S.thread_id * 'a result cont) list ref
24 :     }
25 :    
26 :     datatype status = ENROLLED | WAITING | RESIGNED
27 :    
28 : jhr 3573 datatype 'a enrollment = ENROLL of {
29 : jhr 3572 bar : 'a barrier,
30 :     sts : status ref (* current status of this enrollment *)
31 :     }
32 :    
33 :     (* create a new barrier. The first argument is the update function that
34 :     * is applied to the global state whenever a barrier synchronization occurs.
35 :     * The second argument is the initial global state.
36 :     *)
37 :     fun barrier update init = BAR{
38 :     state = ref init,
39 :     update = update,
40 :     nEnrolled = ref 0,
41 :     nWaiting = ref 0,
42 :     waiting = ref []
43 :     }
44 :    
45 :     (* enroll in a barrier *)
46 :     fun enroll (bar as BAR{nEnrolled, ...}) = (
47 :     S.atomicBegin();
48 :     nEnrolled := !nEnrolled + 1;
49 :     S.atomicEnd();
50 :     ENROLL{bar = bar, sts = ref ENROLLED})
51 :    
52 :     fun wakeupThd result (tid, resumeK) =
53 :     S.enqueueThread(
54 :     tid, callcc(fn k => (callcc(fn k' => throw k k'); throw resumeK result)))
55 :    
56 :     fun return (RAISE exn) = raise exn
57 :     | return (VALUE x) = x
58 :    
59 :     (* synchronize on a barrier *)
60 :     fun wait (ENROLL{bar=BAR{state, update, nEnrolled, nWaiting, waiting}, sts}) = (
61 :     S.atomicBegin();
62 :     case !sts
63 :     of ENROLLED => (
64 :     sts := WAITING;
65 :     nWaiting := !nWaiting+1;
66 :     if (!nWaiting = !nEnrolled)
67 :     then let (* all threads are at the barrier, so we can proceed *)
68 :     val result = let
69 :     val x = update(!state)
70 :     in
71 :     state := x;
72 :     VALUE x
73 :     end handle exn => RAISE exn
74 :     in
75 :     List.app (wakeupThd result) (!waiting);
76 :     nWaiting := 0;
77 :     waiting := [];
78 :     S.atomicEnd ();
79 :     return result
80 :     end
81 :     else (
82 :     sts := WAITING;
83 :     return (callcc (fn resumeK => (
84 :     waiting := (S.getCurThread(), resumeK) :: !waiting;
85 :     S.atomicDispatch())))))
86 :     | WAITING => (S.atomicEnd(); raise Fail "multiple barrier waits")
87 :     | RESIGNED => (S.atomicEnd(); raise Fail "barrier wait after resignation")
88 :     (* end case *))
89 :    
90 :     (* resign from an enrolled barrier *)
91 :     fun resign (ENROLL{bar, sts}) = (
92 :     S.atomicBegin();
93 :     case !sts
94 :     of RESIGNED => () (* ignore multiple resignations *)
95 :     | WAITING => (S.atomicEnd(); raise Fail "resign while waiting")
96 :     | ENROLLED => (sts := RESIGNED; S.atomicEnd()))
97 :    
98 : jhr 3573 (* get the current state of the barrier *)
99 :     fun value (ENROLL{bar=BAR{state, ...}, ...}) = !state
100 :    
101 : jhr 3572 end

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0