Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/core-cml/event.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/core-cml/event.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 844 - (view) (download)

1 : monnier 2 (* event.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     * COPYRIGHT (c) 1989-1991 John H. Reppy
5 :     *
6 :     * The representation of event values and the event combinators.
7 :     *
8 :     * Some important requirements on the implementation of base event values:
9 :     *
10 :     * 1) The pollFn, doFn, and blockFn are always called from inside
11 :     * atomic regions.
12 :     *
13 :     * 2) The pollFn returns an integer priority: this is 0 when not enabled,
14 :     * ~1 for fixed priority, and a positive value for dynamic priority.
15 :     * The standard scheme is to associate a counter with the underlying
16 :     * synchronization object, and to increase it by one for each
17 :     * synchronization attempt.
18 :     *
19 :     * 3) The blockFn is responsible for exiting the atomic region; the doFns
20 :     * should NOT leave the atomic region.
21 :     *
22 :     * 4) The blockFn is responsible for executing the "cleanUp" action
23 :     * prior to leaving the atomic region.
24 :     *)
25 :    
26 :     structure Event : sig
27 :    
28 :     include EVENT
29 :    
30 :     val atomicCVarSet : RepTypes.cvar -> unit
31 :     val cvarGetEvt : RepTypes.cvar -> unit event
32 :    
33 :     end = struct
34 :    
35 :     structure R = RepTypes
36 :     structure S = Scheduler
37 :    
38 :     val capture = SMLofNJ.Cont.capture
39 :     val escape = SMLofNJ.Cont.escape
40 :     val callcc = SMLofNJ.Cont.callcc
41 :     val throw = SMLofNJ.Cont.throw
42 :    
43 :     (* Some inline functions to improve performance *)
44 :     fun map f = let
45 :     fun mapf [] = []
46 :     | mapf [a] = [f a]
47 :     | mapf [a, b] = [f a, f b]
48 :     | mapf [a, b, c] = [f a, f b, f c]
49 :     | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r)
50 :     in
51 :     mapf
52 :     end
53 :     fun app f = let
54 :     fun appf [] = ()
55 :     | appf (x::r) = (f x; appf r)
56 :     in
57 :     appf
58 :     end
59 :     fun foldl f init l = let
60 :     fun foldf ([], accum) = accum
61 :     | foldf (x::r, accum) = foldf(r, f(x, accum))
62 :     in
63 :     foldf (l, init)
64 :     end
65 :    
66 :     fun error msg = raise Fail msg
67 :    
68 : monnier 8 datatype event_status = datatype RepTypes.event_status
69 :     type 'a base_evt = 'a RepTypes.base_evt
70 :     datatype event = datatype RepTypes.event
71 : monnier 2
72 :    
73 :     (** Condition variables. Because these variables are set inside atomic
74 :     ** regions, we have to use different conventions for clean-up, etc.
75 :     ** Instead of requiring the blockFn continuation to call the cleanUp
76 :     ** action and to leave the atomic region, we call the cleanUp function
77 :     ** when setting the condition variable (in atomicCVarSet), and have the
78 :     ** invariant that the blockFn continuation is dispatched outside the
79 :     ** atomic region.
80 :     **)
81 :    
82 :     (* set a condition variable; we assume that this function is always
83 :     * executed in an atomic region.
84 :     *)
85 : blume 844 fun atomicCVarSet (R.CVAR state) = (
86 :     case !state
87 : monnier 8 of (R.CVAR_unset waiting) => let
88 :     val R.Q{rear, ...} = S.rdyQ1
89 :     fun add [] = !rear
90 :     | add ({transId=ref R.CANCEL, ...}::r) = add r
91 :     | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = (
92 :     transId := R.CANCEL;
93 :     cleanUp();
94 :     (tid, kont) :: (add r))
95 :     in
96 :     state := R.CVAR_set 1;
97 :     rear := add waiting
98 :     end
99 :     | _ => error "cvar already set"
100 :     (* end case *))
101 : monnier 2
102 :     (* the event constructor for waiting on a cvar *)
103 :     fun cvarGetEvt (R.CVAR(state)) = let
104 :     fun blockFn {transId, cleanUp, next} = callcc (fn k => let
105 :     val (R.CVAR_unset waiting) = !state
106 :     val item = {transId=transId, cleanUp=cleanUp, kont=k}
107 :     in
108 :     state := R.CVAR_unset(item :: waiting);
109 :     next ()
110 :     end)
111 :     fun pollFn () = (case !state
112 : monnier 8 of (R.CVAR_set n) => let
113 :     fun doFn () = (state := R.CVAR_set 1; S.atomicEnd())
114 :     in
115 :     state := R.CVAR_set(n+1);
116 :     ENABLED{prio=n, doFn=doFn}
117 :     end
118 : monnier 2 | _ => BLOCKED blockFn
119 :     (* end case *))
120 :     in
121 :     BEVT[pollFn]
122 :     end
123 :    
124 :    
125 :     fun alwaysEvt v = BEVT[fn () => R.ENABLED{
126 :     prio= ~1, doFn=fn () => (S.atomicEnd(); v)
127 :     }]
128 :    
129 :     val never = BEVT[]
130 :    
131 :     val guard = GUARD
132 :    
133 :     val withNack = W_NACK
134 :    
135 :     fun choose (el : 'a event list) = let
136 :     fun gatherBEvts ([], l) = BEVT l
137 :     | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l)
138 :     | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs')
139 :     | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs')
140 :     | gatherBEvts (evts, []) = gather (evts, [])
141 :     | gatherBEvts (evts, l) = gather (evts, [BEVT l])
142 :     and gather ([], [evt]) = evt
143 :     | gather ([], evts) = CHOOSE evts
144 :     | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts')
145 :     | gather (BEVT bevs :: r, BEVT bevs' :: r')
146 :     = gather (r, BEVT(bevs @ bevs') :: r')
147 :     | gather (evt :: r, evts') = gather (r, evt :: evts')
148 :     in
149 :     gatherBEvts (rev el, [])
150 :     end
151 :    
152 :     fun wrap (evt, wfn) = let
153 :     fun wrapBaseEvt pollFn () = (case pollFn()
154 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn}
155 :     | (BLOCKED blockFn) => BLOCKED(wfn o blockFn)
156 :     (* end case *))
157 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
158 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
159 :     | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn))
160 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn))
161 :     in
162 :     wrap' evt
163 :     end
164 :    
165 :     fun wrapHandler (evt, hfn) = let
166 :     fun wrap f x = ((f x) handle exn => hfn exn)
167 :     fun wrapBaseEvt pollFn () = (case pollFn()
168 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn}
169 :     | (BLOCKED blockFn) => BLOCKED(wrap blockFn)
170 :     (* end case *))
171 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
172 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
173 :     | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn))
174 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn))
175 :     in
176 :     wrap' evt
177 :     end
178 :    
179 :     datatype 'a event_group
180 :     = BASE_GRP of 'a base_evt list
181 :     | GRP of 'a event_group list
182 :     | NACK_GRP of (R.cvar * 'a event_group)
183 :    
184 : blume 844 (*+DEBUG
185 :     fun sayGrp (msg, eg) = let
186 :     fun f (BASE_GRP l, sl) = "BASE_GRP("::Int.toString(List.length l)::")"::sl
187 :     | f (GRP l, sl) = "GRP(" :: g(l, ")"::sl)
188 :     | f (NACK_GRP l, sl) = "NACK_GRP(" :: f(#2 l, ")"::sl)
189 :     and g ([], sl) = sl
190 :     | g ([x], sl) = f(x, sl)
191 :     | g (x::r, sl) = f(x, "," :: g(r, sl))
192 :     in
193 :     Debug.sayDebugId(String.concat(msg :: ": " :: f(eg, ["\n"])))
194 :     end
195 :     -DEBUG*)
196 :    
197 : monnier 2 (* force the evaluation of any guards in an event group. *)
198 :     fun force (BEVT l) = BASE_GRP l
199 :     | force evt = let
200 :     fun force' (GUARD g) = force' (g ())
201 :     | force' (W_NACK f) = let
202 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
203 :     in
204 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
205 :     end
206 :     | force' (BEVT grp) = BASE_GRP grp
207 :     | force' (CHOOSE evts) = let
208 :     fun forceBL ([], bevs) = BASE_GRP bevs
209 :     | forceBL (evt::r, bevs') = (case (force' evt)
210 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
211 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
212 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
213 :     (* end case *))
214 :     and forceL ([], [grp]) = grp
215 :     | forceL ([], l) = GRP l
216 :     | forceL (evt :: r, l) = (
217 :     case (force' evt, l)
218 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
219 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
220 :     | (GRP grp, l) => forceL (r, grp @ l)
221 :     | (grp, l) => forceL (r, grp :: l)
222 :     (* end case *))
223 :     in
224 :     forceBL (evts, [])
225 :     end
226 :     in
227 :     force' evt
228 :     end
229 :    
230 :     local
231 :     val cnt = ref 0
232 :     fun random i = let val j = !cnt
233 :     in
234 :     if (j = 1000000) then cnt := 0 else cnt := j+1;
235 :     Int.rem(j, i)
236 :     end
237 :     in
238 :     fun selectDoFn ([(_, doFn)], _) = doFn
239 :     | selectDoFn (l, n) = let
240 :     fun priority ~1 = n
241 :     | priority p = p
242 :     fun max ((p, doFn)::r, maxP, k, doFns) = let
243 :     val p = priority p
244 :     in
245 :     if (p > maxP) then max(r, p, 1, [doFn])
246 :     else if (p = maxP) then max(r, maxP, k+1, doFn::doFns)
247 :     else max(r, maxP, k, doFns)
248 :     end
249 :     | max ([], _, k, [doFn]) = doFn
250 :     | max ([], _, k, doFns) = List.nth(doFns, random k)
251 :     in
252 :     max (l, 0, 0, [])
253 :     end
254 :     end
255 :    
256 :     fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread()))
257 :     in
258 :     (flg, fn () => flg := R.CANCEL)
259 :     end
260 :    
261 :     fun syncOnOneEvt (pollFn : 'a base_evt) = (
262 :     S.atomicBegin ();
263 :     case (pollFn())
264 :     of ENABLED{doFn, ...} => doFn()
265 :     | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg()
266 :     in
267 :     blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch}
268 :     end
269 :     (* end case *))
270 :    
271 :     (* this function handles the case of synchronizing on a list of
272 :     * base events (w/o any negative acknowledgements). It also handles
273 :     * the case of synchronizing on NEVER.
274 :     *)
275 :     fun syncOnBEvts [] = S.dispatch()
276 :     | syncOnBEvts [bev] = syncOnOneEvt bev
277 :     | syncOnBEvts bevs = let
278 :     fun ext ([], blockFns) = capture (fn k => let
279 :     val escape = escape k
280 :     val (transId, setFlg) = mkFlg()
281 :     fun log [] = S.atomicDispatch ()
282 :     | log (blockFn :: r) =
283 :     escape (blockFn {
284 :     transId = transId,
285 :     cleanUp = setFlg,
286 :     next = fn () => log r
287 :     })
288 :     in
289 :     log blockFns; error "[log]"
290 :     end)
291 :     | ext (pollFn :: r, blockFns) = (case pollFn()
292 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1)
293 :     | (BLOCKED blockFn) => ext (r, blockFn::blockFns)
294 :     (* end case *))
295 :     (** NOTE: maybe we should just keep track of the max priority?
296 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
297 :     **)
298 :     and extRdy ([], doFns, n) = selectDoFn (doFns, n) ()
299 :     | extRdy (pollFn :: r, doFns, n) = (case pollFn()
300 :     of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1)
301 :     | _ => extRdy (r, doFns, n)
302 :     (* end case *))
303 :     in
304 :     S.atomicBegin();
305 :     ext (bevs, [])
306 :     end
307 :    
308 :     (* walk the event group tree, collecting the base events (with associated
309 : blume 844 * ack flags), and a list of flag sets. A flag set is a (cvar * ack flag list)
310 :     * pairs, where the flags are those associated with the events covered by the
311 :     * nack cvar.
312 : monnier 2 *)
313 :     fun collect grp = let
314 :     val unWrappedFlg = ref false
315 :     fun gatherWrapped (grp, bl, flgSets) = let
316 :     fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let
317 :     fun append ([], bl, allFlgs) = (bl, allFlgs)
318 :     | append (bev::r, bl, allFlgs) = let
319 :     val flg = ref false
320 :     in
321 :     append (r, (bev, flg)::bl, flg::allFlgs)
322 :     end
323 : blume 844 val (bl', allFlgs') = append (bevs, bl, allFlgs)
324 : monnier 2 in
325 :     (bl', allFlgs', flgSets)
326 :     end
327 :     | gather (GRP grp, bl, allFlgs, flgSets) = let
328 :     fun f (grp', (bl', allFlgs', flgSets')) =
329 :     gather (grp', bl', allFlgs', flgSets')
330 :     in
331 :     foldl f (bl, allFlgs, flgSets) grp
332 :     end
333 :     | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let
334 :     val (bl', allFlgs', flgSets') =
335 : blume 844 gather (grp, bl, [], flgSets)
336 : monnier 2 in
337 : blume 844 (bl', allFlgs' @ allFlgs, (cvar, allFlgs') :: flgSets')
338 : monnier 2 end
339 :     val (bl, _, flgSets) = gather (grp, bl, [], flgSets)
340 :     in
341 :     (bl, flgSets)
342 :     end
343 :     in
344 :     case grp
345 :     of (GRP _) => let
346 :     val unWrappedFlg = ref false
347 :     fun append ([], bl) = bl
348 :     | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl)
349 :     fun gather (BASE_GRP bevs, bl, flgSets) =
350 :     (append(bevs, bl), flgSets)
351 :     | gather (GRP grp, bl, flgSets) = let
352 :     fun f (grp', (bl', flgSets')) =
353 :     gather(grp', bl', flgSets')
354 :     in
355 :     foldl f (bl, flgSets) grp
356 :     end
357 :     | gather (grp as NACK_GRP _, bl, flgSets) =
358 :     gatherWrapped (grp, bl, flgSets)
359 :     in
360 :     gather (grp, [], [])
361 :     end
362 :     | grp => gatherWrapped (grp, [], [])
363 :     (* end case *)
364 :     end
365 :    
366 :     (* this function handles the more complicated case of synchronization
367 :     * on groups of events where negative acknowledgements are involved.
368 :     *)
369 :     fun syncOnGrp grp = let
370 :     val (bl, flgSets) = collect grp
371 :     fun chkCVars () = let
372 : blume 844 (* chkCVar checks the flags of a flag set. If they are all false
373 :     * then the corresponding cvar is set to signal the negative ack.
374 :     *)
375 : monnier 2 fun chkCVar (cvar, flgs) = let
376 : blume 844 fun chkFlgs [] = atomicCVarSet cvar
377 :     | chkFlgs ((ref true)::_) = ()
378 : monnier 2 | chkFlgs (_::r) = chkFlgs r
379 :     in
380 :     chkFlgs flgs
381 :     end
382 :     in
383 :     app chkCVar flgSets
384 :     end
385 :     fun ext ([], blockFns) = capture (fn k => let
386 :     val escape = escape k
387 :     val transId = ref(R.TRANS(S.getCurThread()))
388 :     fun setFlg flg () = (
389 :     transId := R.CANCEL; flg := true; chkCVars())
390 :     fun log [] = S.atomicDispatch ()
391 :     | log ((blockFn, flg) :: r) =
392 :     escape (blockFn {
393 :     transId = transId,
394 :     cleanUp = setFlg flg,
395 :     next = fn () => log r
396 :     })
397 :     in
398 :     log blockFns; error "[log]"
399 :     end)
400 :     | ext ((pollFn, flg) :: r, blockFns) = (case pollFn()
401 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1)
402 :     | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns)
403 :     (* end case *))
404 :     (** NOTE: maybe we should just keep track of the max priority?
405 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
406 :     **)
407 :     and extRdy ([], doFns, n) = let
408 :     val (doFn, flg) = selectDoFn (doFns, n)
409 :     in
410 :     flg := true;
411 :     chkCVars ();
412 :     doFn()
413 :     end
414 :     | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn()
415 :     of ENABLED{prio, doFn} =>
416 :     extRdy (r, (prio, (doFn, flg))::doFns, n+1)
417 :     | _ => extRdy (r, doFns, n)
418 :     (* end case *))
419 :     in
420 :     case bl
421 :     of [(bev, _)] => syncOnOneEvt bev
422 :     | _ => (S.atomicBegin(); ext (bl, []))
423 :     (* end case *)
424 :     end
425 :    
426 :     fun sync ev = (case (force ev)
427 :     of (BASE_GRP bevs) => syncOnBEvts bevs
428 :     | grp => syncOnGrp grp
429 :     (* end case *))
430 :    
431 :     fun select evts = let
432 :     fun forceBL ([], bevs) = BASE_GRP bevs
433 :     | forceBL (evt::r, bevs') = (case (force' evt)
434 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
435 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
436 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
437 :     (* end case *))
438 :     and forceL ([], [grp]) = grp
439 :     | forceL ([], l) = GRP l
440 :     | forceL (evt :: r, l) = (
441 :     case (force' evt, l)
442 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
443 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
444 :     | (GRP grp, l) => forceL (r, grp @ l)
445 :     | (grp, l) => forceL (r, grp :: l)
446 :     (* end case *))
447 :     and force' (GUARD g) = force' (g ())
448 :     | force' (W_NACK f) = let
449 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
450 :     in
451 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
452 :     end
453 :     | force' (BEVT grp) = BASE_GRP grp
454 :     | force' (CHOOSE evts) = forceBL (evts, [])
455 :     in
456 :     case forceBL(evts, [])
457 :     of (BASE_GRP bevs) => syncOnBEvts bevs
458 :     | grp => syncOnGrp grp
459 :     (* end case *)
460 :     end
461 :    
462 :     end;
463 :    

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0