Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/core-cml/event.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/core-cml/event.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 8 - (view) (download)
Original Path: sml/branches/SMLNJ/src/cml/src/core-cml/event.sml

1 : monnier 2 (* event.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     * COPYRIGHT (c) 1989-1991 John H. Reppy
5 :     *
6 :     * The representation of event values and the event combinators.
7 :     *
8 :     * Some important requirements on the implementation of base event values:
9 :     *
10 :     * 1) The pollFn, doFn, and blockFn are always called from inside
11 :     * atomic regions.
12 :     *
13 :     * 2) The pollFn returns an integer priority: this is 0 when not enabled,
14 :     * ~1 for fixed priority, and a positive value for dynamic priority.
15 :     * The standard scheme is to associate a counter with the underlying
16 :     * synchronization object, and to increase it by one for each
17 :     * synchronization attempt.
18 :     *
19 :     * 3) The blockFn is responsible for exiting the atomic region; the doFns
20 :     * should NOT leave the atomic region.
21 :     *
22 :     * 4) The blockFn is responsible for executing the "cleanUp" action
23 :     * prior to leaving the atomic region.
24 :     *)
25 :    
26 :     structure Event : sig
27 :    
28 :     include EVENT
29 :    
30 :     val atomicCVarSet : RepTypes.cvar -> unit
31 :     val cvarGetEvt : RepTypes.cvar -> unit event
32 :    
33 :     end = struct
34 :    
35 :     structure R = RepTypes
36 :     structure S = Scheduler
37 :    
38 :     val capture = SMLofNJ.Cont.capture
39 :     val escape = SMLofNJ.Cont.escape
40 :     val callcc = SMLofNJ.Cont.callcc
41 :     val throw = SMLofNJ.Cont.throw
42 :    
43 :     (* Some inline functions to improve performance *)
44 :     fun map f = let
45 :     fun mapf [] = []
46 :     | mapf [a] = [f a]
47 :     | mapf [a, b] = [f a, f b]
48 :     | mapf [a, b, c] = [f a, f b, f c]
49 :     | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r)
50 :     in
51 :     mapf
52 :     end
53 :     fun app f = let
54 :     fun appf [] = ()
55 :     | appf (x::r) = (f x; appf r)
56 :     in
57 :     appf
58 :     end
59 :     fun foldl f init l = let
60 :     fun foldf ([], accum) = accum
61 :     | foldf (x::r, accum) = foldf(r, f(x, accum))
62 :     in
63 :     foldf (l, init)
64 :     end
65 :    
66 :     fun error msg = raise Fail msg
67 :    
68 : monnier 8 datatype event_status = datatype RepTypes.event_status
69 :     type 'a base_evt = 'a RepTypes.base_evt
70 :     datatype event = datatype RepTypes.event
71 : monnier 2
72 :    
73 :     (** Condition variables. Because these variables are set inside atomic
74 :     ** regions, we have to use different conventions for clean-up, etc.
75 :     ** Instead of requiring the blockFn continuation to call the cleanUp
76 :     ** action and to leave the atomic region, we call the cleanUp function
77 :     ** when setting the condition variable (in atomicCVarSet), and have the
78 :     ** invariant that the blockFn continuation is dispatched outside the
79 :     ** atomic region.
80 :     **)
81 :    
82 :     (* set a condition variable; we assume that this function is always
83 :     * executed in an atomic region.
84 :     *)
85 : monnier 8 fun atomicCVarSet (R.CVAR state) = (case !state
86 :     of (R.CVAR_unset waiting) => let
87 :     val R.Q{rear, ...} = S.rdyQ1
88 :     fun add [] = !rear
89 :     | add ({transId=ref R.CANCEL, ...}::r) = add r
90 :     | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = (
91 :     transId := R.CANCEL;
92 :     cleanUp();
93 :     (tid, kont) :: (add r))
94 :     in
95 :     state := R.CVAR_set 1;
96 :     rear := add waiting
97 :     end
98 :     | _ => error "cvar already set"
99 :     (* end case *))
100 : monnier 2
101 :     (* the event constructor for waiting on a cvar *)
102 :     fun cvarGetEvt (R.CVAR(state)) = let
103 :     fun blockFn {transId, cleanUp, next} = callcc (fn k => let
104 :     val (R.CVAR_unset waiting) = !state
105 :     val item = {transId=transId, cleanUp=cleanUp, kont=k}
106 :     in
107 :     state := R.CVAR_unset(item :: waiting);
108 :     next ()
109 :     end)
110 :     fun pollFn () = (case !state
111 : monnier 8 of (R.CVAR_set n) => let
112 :     fun doFn () = (state := R.CVAR_set 1; S.atomicEnd())
113 :     in
114 :     state := R.CVAR_set(n+1);
115 :     ENABLED{prio=n, doFn=doFn}
116 :     end
117 : monnier 2 | _ => BLOCKED blockFn
118 :     (* end case *))
119 :     in
120 :     BEVT[pollFn]
121 :     end
122 :    
123 :    
124 :     fun alwaysEvt v = BEVT[fn () => R.ENABLED{
125 :     prio= ~1, doFn=fn () => (S.atomicEnd(); v)
126 :     }]
127 :    
128 :     val never = BEVT[]
129 :    
130 :     val guard = GUARD
131 :    
132 :     val withNack = W_NACK
133 :    
134 :     fun choose (el : 'a event list) = let
135 :     fun gatherBEvts ([], l) = BEVT l
136 :     | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l)
137 :     | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs')
138 :     | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs')
139 :     | gatherBEvts (evts, []) = gather (evts, [])
140 :     | gatherBEvts (evts, l) = gather (evts, [BEVT l])
141 :     and gather ([], [evt]) = evt
142 :     | gather ([], evts) = CHOOSE evts
143 :     | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts')
144 :     | gather (BEVT bevs :: r, BEVT bevs' :: r')
145 :     = gather (r, BEVT(bevs @ bevs') :: r')
146 :     | gather (evt :: r, evts') = gather (r, evt :: evts')
147 :     in
148 :     gatherBEvts (rev el, [])
149 :     end
150 :    
151 :     fun wrap (evt, wfn) = let
152 :     fun wrapBaseEvt pollFn () = (case pollFn()
153 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn}
154 :     | (BLOCKED blockFn) => BLOCKED(wfn o blockFn)
155 :     (* end case *))
156 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
157 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
158 :     | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn))
159 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn))
160 :     in
161 :     wrap' evt
162 :     end
163 :    
164 :     fun wrapHandler (evt, hfn) = let
165 :     fun wrap f x = ((f x) handle exn => hfn exn)
166 :     fun wrapBaseEvt pollFn () = (case pollFn()
167 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn}
168 :     | (BLOCKED blockFn) => BLOCKED(wrap blockFn)
169 :     (* end case *))
170 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
171 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
172 :     | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn))
173 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn))
174 :     in
175 :     wrap' evt
176 :     end
177 :    
178 :     datatype 'a event_group
179 :     = BASE_GRP of 'a base_evt list
180 :     | GRP of 'a event_group list
181 :     | NACK_GRP of (R.cvar * 'a event_group)
182 :    
183 :     (* force the evaluation of any guards in an event group. *)
184 :     fun force (BEVT l) = BASE_GRP l
185 :     | force evt = let
186 :     fun force' (GUARD g) = force' (g ())
187 :     | force' (W_NACK f) = let
188 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
189 :     in
190 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
191 :     end
192 :     | force' (BEVT grp) = BASE_GRP grp
193 :     | force' (CHOOSE evts) = let
194 :     fun forceBL ([], bevs) = BASE_GRP bevs
195 :     | forceBL (evt::r, bevs') = (case (force' evt)
196 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
197 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
198 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
199 :     (* end case *))
200 :     and forceL ([], [grp]) = grp
201 :     | forceL ([], l) = GRP l
202 :     | forceL (evt :: r, l) = (
203 :     case (force' evt, l)
204 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
205 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
206 :     | (GRP grp, l) => forceL (r, grp @ l)
207 :     | (grp, l) => forceL (r, grp :: l)
208 :     (* end case *))
209 :     in
210 :     forceBL (evts, [])
211 :     end
212 :     in
213 :     force' evt
214 :     end
215 :    
216 :     local
217 :     val cnt = ref 0
218 :     fun random i = let val j = !cnt
219 :     in
220 :     if (j = 1000000) then cnt := 0 else cnt := j+1;
221 :     Int.rem(j, i)
222 :     end
223 :     in
224 :     fun selectDoFn ([(_, doFn)], _) = doFn
225 :     | selectDoFn (l, n) = let
226 :     fun priority ~1 = n
227 :     | priority p = p
228 :     fun max ((p, doFn)::r, maxP, k, doFns) = let
229 :     val p = priority p
230 :     in
231 :     if (p > maxP) then max(r, p, 1, [doFn])
232 :     else if (p = maxP) then max(r, maxP, k+1, doFn::doFns)
233 :     else max(r, maxP, k, doFns)
234 :     end
235 :     | max ([], _, k, [doFn]) = doFn
236 :     | max ([], _, k, doFns) = List.nth(doFns, random k)
237 :     in
238 :     max (l, 0, 0, [])
239 :     end
240 :     end
241 :    
242 :     fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread()))
243 :     in
244 :     (flg, fn () => flg := R.CANCEL)
245 :     end
246 :    
247 :     fun syncOnOneEvt (pollFn : 'a base_evt) = (
248 :     S.atomicBegin ();
249 :     case (pollFn())
250 :     of ENABLED{doFn, ...} => doFn()
251 :     | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg()
252 :     in
253 :     blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch}
254 :     end
255 :     (* end case *))
256 :    
257 :     (* this function handles the case of synchronizing on a list of
258 :     * base events (w/o any negative acknowledgements). It also handles
259 :     * the case of synchronizing on NEVER.
260 :     *)
261 :     fun syncOnBEvts [] = S.dispatch()
262 :     | syncOnBEvts [bev] = syncOnOneEvt bev
263 :     | syncOnBEvts bevs = let
264 :     fun ext ([], blockFns) = capture (fn k => let
265 :     val escape = escape k
266 :     val (transId, setFlg) = mkFlg()
267 :     fun log [] = S.atomicDispatch ()
268 :     | log (blockFn :: r) =
269 :     escape (blockFn {
270 :     transId = transId,
271 :     cleanUp = setFlg,
272 :     next = fn () => log r
273 :     })
274 :     in
275 :     log blockFns; error "[log]"
276 :     end)
277 :     | ext (pollFn :: r, blockFns) = (case pollFn()
278 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1)
279 :     | (BLOCKED blockFn) => ext (r, blockFn::blockFns)
280 :     (* end case *))
281 :     (** NOTE: maybe we should just keep track of the max priority?
282 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
283 :     **)
284 :     and extRdy ([], doFns, n) = selectDoFn (doFns, n) ()
285 :     | extRdy (pollFn :: r, doFns, n) = (case pollFn()
286 :     of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1)
287 :     | _ => extRdy (r, doFns, n)
288 :     (* end case *))
289 :     in
290 :     S.atomicBegin();
291 :     ext (bevs, [])
292 :     end
293 :    
294 :     (* walk the event group tree, collecting the base events (with associated
295 :     * ack flags), and a list of (cvar * ack flag set) pairs.
296 :     *)
297 :     fun collect grp = let
298 :     val unWrappedFlg = ref false
299 :     fun gatherWrapped (grp, bl, flgSets) = let
300 :     fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let
301 :     fun append ([], bl, allFlgs) = (bl, allFlgs)
302 :     | append (bev::r, bl, allFlgs) = let
303 :     val flg = ref false
304 :     in
305 :     append (r, (bev, flg)::bl, flg::allFlgs)
306 :     end
307 :     val (bl', allFlgs') = append (bevs, [], allFlgs)
308 :     in
309 :     (bl', allFlgs', flgSets)
310 :     end
311 :     | gather (GRP grp, bl, allFlgs, flgSets) = let
312 :     fun f (grp', (bl', allFlgs', flgSets')) =
313 :     gather (grp', bl', allFlgs', flgSets')
314 :     in
315 :     foldl f (bl, allFlgs, flgSets) grp
316 :     end
317 :     | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let
318 :     val (bl', allFlgs', flgSets') =
319 :     gather (grp, bl, allFlgs, flgSets)
320 :     in
321 :     (bl', allFlgs', (cvar, allFlgs') :: flgSets')
322 :     end
323 :     val (bl, _, flgSets) = gather (grp, bl, [], flgSets)
324 :     in
325 :     (bl, flgSets)
326 :     end
327 :     in
328 :     case grp
329 :     of (GRP _) => let
330 :     val unWrappedFlg = ref false
331 :     fun append ([], bl) = bl
332 :     | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl)
333 :     fun gather (BASE_GRP bevs, bl, flgSets) =
334 :     (append(bevs, bl), flgSets)
335 :     | gather (GRP grp, bl, flgSets) = let
336 :     fun f (grp', (bl', flgSets')) =
337 :     gather(grp', bl', flgSets')
338 :     in
339 :     foldl f (bl, flgSets) grp
340 :     end
341 :     | gather (grp as NACK_GRP _, bl, flgSets) =
342 :     gatherWrapped (grp, bl, flgSets)
343 :     in
344 :     gather (grp, [], [])
345 :     end
346 :     | grp => gatherWrapped (grp, [], [])
347 :     (* end case *)
348 :     end
349 :    
350 :     (* this function handles the more complicated case of synchronization
351 :     * on groups of events where negative acknowledgements are involved.
352 :     *)
353 :     fun syncOnGrp grp = let
354 :     val (bl, flgSets) = collect grp
355 :     fun chkCVars () = let
356 :     fun chkCVar (cvar, flgs) = let
357 :     fun chkFlgs [] = ()
358 :     | chkFlgs ((ref true)::_) = atomicCVarSet cvar
359 :     | chkFlgs (_::r) = chkFlgs r
360 :     in
361 :     chkFlgs flgs
362 :     end
363 :     in
364 :     app chkCVar flgSets
365 :     end
366 :     fun ext ([], blockFns) = capture (fn k => let
367 :     val escape = escape k
368 :     val transId = ref(R.TRANS(S.getCurThread()))
369 :     fun setFlg flg () = (
370 :     transId := R.CANCEL; flg := true; chkCVars())
371 :     fun log [] = S.atomicDispatch ()
372 :     | log ((blockFn, flg) :: r) =
373 :     escape (blockFn {
374 :     transId = transId,
375 :     cleanUp = setFlg flg,
376 :     next = fn () => log r
377 :     })
378 :     in
379 :     log blockFns; error "[log]"
380 :     end)
381 :     | ext ((pollFn, flg) :: r, blockFns) = (case pollFn()
382 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1)
383 :     | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns)
384 :     (* end case *))
385 :     (** NOTE: maybe we should just keep track of the max priority?
386 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
387 :     **)
388 :     and extRdy ([], doFns, n) = let
389 :     val (doFn, flg) = selectDoFn (doFns, n)
390 :     in
391 :     flg := true;
392 :     chkCVars ();
393 :     doFn()
394 :     end
395 :     | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn()
396 :     of ENABLED{prio, doFn} =>
397 :     extRdy (r, (prio, (doFn, flg))::doFns, n+1)
398 :     | _ => extRdy (r, doFns, n)
399 :     (* end case *))
400 :     in
401 :     case bl
402 :     of [(bev, _)] => syncOnOneEvt bev
403 :     | _ => (S.atomicBegin(); ext (bl, []))
404 :     (* end case *)
405 :     end
406 :    
407 :     fun sync ev = (case (force ev)
408 :     of (BASE_GRP bevs) => syncOnBEvts bevs
409 :     | grp => syncOnGrp grp
410 :     (* end case *))
411 :    
412 :     fun select evts = let
413 :     fun forceBL ([], bevs) = BASE_GRP bevs
414 :     | forceBL (evt::r, bevs') = (case (force' evt)
415 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
416 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
417 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
418 :     (* end case *))
419 :     and forceL ([], [grp]) = grp
420 :     | forceL ([], l) = GRP l
421 :     | forceL (evt :: r, l) = (
422 :     case (force' evt, l)
423 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
424 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
425 :     | (GRP grp, l) => forceL (r, grp @ l)
426 :     | (grp, l) => forceL (r, grp :: l)
427 :     (* end case *))
428 :     and force' (GUARD g) = force' (g ())
429 :     | force' (W_NACK f) = let
430 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
431 :     in
432 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
433 :     end
434 :     | force' (BEVT grp) = BASE_GRP grp
435 :     | force' (CHOOSE evts) = forceBL (evts, [])
436 :     in
437 :     case forceBL(evts, [])
438 :     of (BASE_GRP bevs) => syncOnBEvts bevs
439 :     | grp => syncOnGrp grp
440 :     (* end case *)
441 :     end
442 :    
443 :     end;
444 :    

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0