Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/core-cml/event.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/core-cml/event.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (view) (download)

1 : monnier 2 (* event.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     * COPYRIGHT (c) 1989-1991 John H. Reppy
5 :     *
6 :     * The representation of event values and the event combinators.
7 :     *
8 :     * Some important requirements on the implementation of base event values:
9 :     *
10 :     * 1) The pollFn, doFn, and blockFn are always called from inside
11 :     * atomic regions.
12 :     *
13 :     * 2) The pollFn returns an integer priority: this is 0 when not enabled,
14 :     * ~1 for fixed priority, and a positive value for dynamic priority.
15 :     * The standard scheme is to associate a counter with the underlying
16 :     * synchronization object, and to increase it by one for each
17 :     * synchronization attempt.
18 :     *
19 :     * 3) The blockFn is responsible for exiting the atomic region; the doFns
20 :     * should NOT leave the atomic region.
21 :     *
22 :     * 4) The blockFn is responsible for executing the "cleanUp" action
23 :     * prior to leaving the atomic region.
24 :     *)
25 :    
26 :     structure Event : sig
27 :    
28 :     include EVENT
29 :    
30 :     val atomicCVarSet : RepTypes.cvar -> unit
31 :     val cvarGetEvt : RepTypes.cvar -> unit event
32 :    
33 :     end = struct
34 :    
35 :     structure R = RepTypes
36 :     structure S = Scheduler
37 :    
38 :     val capture = SMLofNJ.Cont.capture
39 :     val escape = SMLofNJ.Cont.escape
40 :     val callcc = SMLofNJ.Cont.callcc
41 :     val throw = SMLofNJ.Cont.throw
42 :    
43 :     (* Some inline functions to improve performance *)
44 :     fun map f = let
45 :     fun mapf [] = []
46 :     | mapf [a] = [f a]
47 :     | mapf [a, b] = [f a, f b]
48 :     | mapf [a, b, c] = [f a, f b, f c]
49 :     | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r)
50 :     in
51 :     mapf
52 :     end
53 :     fun app f = let
54 :     fun appf [] = ()
55 :     | appf (x::r) = (f x; appf r)
56 :     in
57 :     appf
58 :     end
59 :     fun foldl f init l = let
60 :     fun foldf ([], accum) = accum
61 :     | foldf (x::r, accum) = foldf(r, f(x, accum))
62 :     in
63 :     foldf (l, init)
64 :     end
65 :    
66 :     fun error msg = raise Fail msg
67 :    
68 :     structure Rep : sig
69 :     datatype 'a event_status
70 :     = ENABLED of {prio : int, doFn : unit -> 'a}
71 :     | BLOCKED of {
72 :     transId : R.trans_id ref, cleanUp : unit -> unit, next : unit -> unit
73 :     } -> 'a
74 :     type 'a base_evt (* = unit -> 'a event_status *)
75 :     datatype 'a event
76 :     = BEVT of 'a base_evt list
77 :     | CHOOSE of 'a event list
78 :     | GUARD of unit -> 'a event
79 :     | W_NACK of unit event -> 'a event
80 :     end = RepTypes
81 :     open Rep
82 :    
83 :    
84 :     (** Condition variables. Because these variables are set inside atomic
85 :     ** regions, we have to use different conventions for clean-up, etc.
86 :     ** Instead of requiring the blockFn continuation to call the cleanUp
87 :     ** action and to leave the atomic region, we call the cleanUp function
88 :     ** when setting the condition variable (in atomicCVarSet), and have the
89 :     ** invariant that the blockFn continuation is dispatched outside the
90 :     ** atomic region.
91 :     **)
92 :    
93 :     (* set a condition variable; we assume that this function is always
94 :     * executed in an atomic region.
95 :     *)
96 :     fun atomicCVarSet (R.CVAR(ref(R.CVAR_unset waiting))) = let
97 :     val R.Q{rear, ...} = S.rdyQ1
98 :     fun add [] = !rear
99 :     | add ({transId=ref R.CANCEL, ...}::r) = add r
100 :     | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = (
101 :     transId := R.CANCEL;
102 :     cleanUp();
103 :     (tid, kont) :: (add r))
104 :     in
105 :     rear := add waiting
106 :     end
107 :     | atomicCVarSet (R.CVAR _) = error "cvar already set"
108 :    
109 :     (* the event constructor for waiting on a cvar *)
110 :     fun cvarGetEvt (R.CVAR(state)) = let
111 :     fun blockFn {transId, cleanUp, next} = callcc (fn k => let
112 :     val (R.CVAR_unset waiting) = !state
113 :     val item = {transId=transId, cleanUp=cleanUp, kont=k}
114 :     in
115 :     state := R.CVAR_unset(item :: waiting);
116 :     next ()
117 :     end)
118 :     fun pollFn () = (case !state
119 :     of (R.CVAR_set n) => (
120 :     state := R.CVAR_set(n+1);
121 :     ENABLED{prio=n, doFn=S.atomicEnd})
122 :     | _ => BLOCKED blockFn
123 :     (* end case *))
124 :     in
125 :     BEVT[pollFn]
126 :     end
127 :    
128 :    
129 :     fun alwaysEvt v = BEVT[fn () => R.ENABLED{
130 :     prio= ~1, doFn=fn () => (S.atomicEnd(); v)
131 :     }]
132 :    
133 :     val never = BEVT[]
134 :    
135 :     val guard = GUARD
136 :    
137 :     val withNack = W_NACK
138 :    
139 :     fun choose (el : 'a event list) = let
140 :     fun gatherBEvts ([], l) = BEVT l
141 :     | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l)
142 :     | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs')
143 :     | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs')
144 :     | gatherBEvts (evts, []) = gather (evts, [])
145 :     | gatherBEvts (evts, l) = gather (evts, [BEVT l])
146 :     and gather ([], [evt]) = evt
147 :     | gather ([], evts) = CHOOSE evts
148 :     | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts')
149 :     | gather (BEVT bevs :: r, BEVT bevs' :: r')
150 :     = gather (r, BEVT(bevs @ bevs') :: r')
151 :     | gather (evt :: r, evts') = gather (r, evt :: evts')
152 :     in
153 :     gatherBEvts (rev el, [])
154 :     end
155 :    
156 :     fun wrap (evt, wfn) = let
157 :     fun wrapBaseEvt pollFn () = (case pollFn()
158 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn}
159 :     | (BLOCKED blockFn) => BLOCKED(wfn o blockFn)
160 :     (* end case *))
161 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
162 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
163 :     | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn))
164 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn))
165 :     in
166 :     wrap' evt
167 :     end
168 :    
169 :     fun wrapHandler (evt, hfn) = let
170 :     fun wrap f x = ((f x) handle exn => hfn exn)
171 :     fun wrapBaseEvt pollFn () = (case pollFn()
172 :     of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn}
173 :     | (BLOCKED blockFn) => BLOCKED(wrap blockFn)
174 :     (* end case *))
175 :     fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs)
176 :     | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts)
177 :     | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn))
178 :     | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn))
179 :     in
180 :     wrap' evt
181 :     end
182 :    
183 :     datatype 'a event_group
184 :     = BASE_GRP of 'a base_evt list
185 :     | GRP of 'a event_group list
186 :     | NACK_GRP of (R.cvar * 'a event_group)
187 :    
188 :     (* force the evaluation of any guards in an event group. *)
189 :     fun force (BEVT l) = BASE_GRP l
190 :     | force evt = let
191 :     fun force' (GUARD g) = force' (g ())
192 :     | force' (W_NACK f) = let
193 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
194 :     in
195 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
196 :     end
197 :     | force' (BEVT grp) = BASE_GRP grp
198 :     | force' (CHOOSE evts) = let
199 :     fun forceBL ([], bevs) = BASE_GRP bevs
200 :     | forceBL (evt::r, bevs') = (case (force' evt)
201 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
202 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
203 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
204 :     (* end case *))
205 :     and forceL ([], [grp]) = grp
206 :     | forceL ([], l) = GRP l
207 :     | forceL (evt :: r, l) = (
208 :     case (force' evt, l)
209 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
210 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
211 :     | (GRP grp, l) => forceL (r, grp @ l)
212 :     | (grp, l) => forceL (r, grp :: l)
213 :     (* end case *))
214 :     in
215 :     forceBL (evts, [])
216 :     end
217 :     in
218 :     force' evt
219 :     end
220 :    
221 :     local
222 :     val cnt = ref 0
223 :     fun random i = let val j = !cnt
224 :     in
225 :     if (j = 1000000) then cnt := 0 else cnt := j+1;
226 :     Int.rem(j, i)
227 :     end
228 :     in
229 :     fun selectDoFn ([(_, doFn)], _) = doFn
230 :     | selectDoFn (l, n) = let
231 :     fun priority ~1 = n
232 :     | priority p = p
233 :     fun max ((p, doFn)::r, maxP, k, doFns) = let
234 :     val p = priority p
235 :     in
236 :     if (p > maxP) then max(r, p, 1, [doFn])
237 :     else if (p = maxP) then max(r, maxP, k+1, doFn::doFns)
238 :     else max(r, maxP, k, doFns)
239 :     end
240 :     | max ([], _, k, [doFn]) = doFn
241 :     | max ([], _, k, doFns) = List.nth(doFns, random k)
242 :     in
243 :     max (l, 0, 0, [])
244 :     end
245 :     end
246 :    
247 :     fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread()))
248 :     in
249 :     (flg, fn () => flg := R.CANCEL)
250 :     end
251 :    
252 :     fun syncOnOneEvt (pollFn : 'a base_evt) = (
253 :     S.atomicBegin ();
254 :     case (pollFn())
255 :     of ENABLED{doFn, ...} => doFn()
256 :     | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg()
257 :     in
258 :     blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch}
259 :     end
260 :     (* end case *))
261 :    
262 :     (* this function handles the case of synchronizing on a list of
263 :     * base events (w/o any negative acknowledgements). It also handles
264 :     * the case of synchronizing on NEVER.
265 :     *)
266 :     fun syncOnBEvts [] = S.dispatch()
267 :     | syncOnBEvts [bev] = syncOnOneEvt bev
268 :     | syncOnBEvts bevs = let
269 :     fun ext ([], blockFns) = capture (fn k => let
270 :     val escape = escape k
271 :     val (transId, setFlg) = mkFlg()
272 :     fun log [] = S.atomicDispatch ()
273 :     | log (blockFn :: r) =
274 :     escape (blockFn {
275 :     transId = transId,
276 :     cleanUp = setFlg,
277 :     next = fn () => log r
278 :     })
279 :     in
280 :     log blockFns; error "[log]"
281 :     end)
282 :     | ext (pollFn :: r, blockFns) = (case pollFn()
283 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1)
284 :     | (BLOCKED blockFn) => ext (r, blockFn::blockFns)
285 :     (* end case *))
286 :     (** NOTE: maybe we should just keep track of the max priority?
287 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
288 :     **)
289 :     and extRdy ([], doFns, n) = selectDoFn (doFns, n) ()
290 :     | extRdy (pollFn :: r, doFns, n) = (case pollFn()
291 :     of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1)
292 :     | _ => extRdy (r, doFns, n)
293 :     (* end case *))
294 :     in
295 :     S.atomicBegin();
296 :     ext (bevs, [])
297 :     end
298 :    
299 :     (* walk the event group tree, collecting the base events (with associated
300 :     * ack flags), and a list of (cvar * ack flag set) pairs.
301 :     *)
302 :     fun collect grp = let
303 :     val unWrappedFlg = ref false
304 :     fun gatherWrapped (grp, bl, flgSets) = let
305 :     fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let
306 :     fun append ([], bl, allFlgs) = (bl, allFlgs)
307 :     | append (bev::r, bl, allFlgs) = let
308 :     val flg = ref false
309 :     in
310 :     append (r, (bev, flg)::bl, flg::allFlgs)
311 :     end
312 :     val (bl', allFlgs') = append (bevs, [], allFlgs)
313 :     in
314 :     (bl', allFlgs', flgSets)
315 :     end
316 :     | gather (GRP grp, bl, allFlgs, flgSets) = let
317 :     fun f (grp', (bl', allFlgs', flgSets')) =
318 :     gather (grp', bl', allFlgs', flgSets')
319 :     in
320 :     foldl f (bl, allFlgs, flgSets) grp
321 :     end
322 :     | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let
323 :     val (bl', allFlgs', flgSets') =
324 :     gather (grp, bl, allFlgs, flgSets)
325 :     in
326 :     (bl', allFlgs', (cvar, allFlgs') :: flgSets')
327 :     end
328 :     val (bl, _, flgSets) = gather (grp, bl, [], flgSets)
329 :     in
330 :     (bl, flgSets)
331 :     end
332 :     in
333 :     case grp
334 :     of (GRP _) => let
335 :     val unWrappedFlg = ref false
336 :     fun append ([], bl) = bl
337 :     | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl)
338 :     fun gather (BASE_GRP bevs, bl, flgSets) =
339 :     (append(bevs, bl), flgSets)
340 :     | gather (GRP grp, bl, flgSets) = let
341 :     fun f (grp', (bl', flgSets')) =
342 :     gather(grp', bl', flgSets')
343 :     in
344 :     foldl f (bl, flgSets) grp
345 :     end
346 :     | gather (grp as NACK_GRP _, bl, flgSets) =
347 :     gatherWrapped (grp, bl, flgSets)
348 :     in
349 :     gather (grp, [], [])
350 :     end
351 :     | grp => gatherWrapped (grp, [], [])
352 :     (* end case *)
353 :     end
354 :    
355 :     (* this function handles the more complicated case of synchronization
356 :     * on groups of events where negative acknowledgements are involved.
357 :     *)
358 :     fun syncOnGrp grp = let
359 :     val (bl, flgSets) = collect grp
360 :     fun chkCVars () = let
361 :     fun chkCVar (cvar, flgs) = let
362 :     fun chkFlgs [] = ()
363 :     | chkFlgs ((ref true)::_) = atomicCVarSet cvar
364 :     | chkFlgs (_::r) = chkFlgs r
365 :     in
366 :     chkFlgs flgs
367 :     end
368 :     in
369 :     app chkCVar flgSets
370 :     end
371 :     fun ext ([], blockFns) = capture (fn k => let
372 :     val escape = escape k
373 :     val transId = ref(R.TRANS(S.getCurThread()))
374 :     fun setFlg flg () = (
375 :     transId := R.CANCEL; flg := true; chkCVars())
376 :     fun log [] = S.atomicDispatch ()
377 :     | log ((blockFn, flg) :: r) =
378 :     escape (blockFn {
379 :     transId = transId,
380 :     cleanUp = setFlg flg,
381 :     next = fn () => log r
382 :     })
383 :     in
384 :     log blockFns; error "[log]"
385 :     end)
386 :     | ext ((pollFn, flg) :: r, blockFns) = (case pollFn()
387 :     of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1)
388 :     | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns)
389 :     (* end case *))
390 :     (** NOTE: maybe we should just keep track of the max priority?
391 :     ** What about fairness to fixed priority events (e.g., always, timeout?)
392 :     **)
393 :     and extRdy ([], doFns, n) = let
394 :     val (doFn, flg) = selectDoFn (doFns, n)
395 :     in
396 :     flg := true;
397 :     chkCVars ();
398 :     doFn()
399 :     end
400 :     | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn()
401 :     of ENABLED{prio, doFn} =>
402 :     extRdy (r, (prio, (doFn, flg))::doFns, n+1)
403 :     | _ => extRdy (r, doFns, n)
404 :     (* end case *))
405 :     in
406 :     case bl
407 :     of [(bev, _)] => syncOnOneEvt bev
408 :     | _ => (S.atomicBegin(); ext (bl, []))
409 :     (* end case *)
410 :     end
411 :    
412 :     fun sync ev = (case (force ev)
413 :     of (BASE_GRP bevs) => syncOnBEvts bevs
414 :     | grp => syncOnGrp grp
415 :     (* end case *))
416 :    
417 :     fun select evts = let
418 :     fun forceBL ([], bevs) = BASE_GRP bevs
419 :     | forceBL (evt::r, bevs') = (case (force' evt)
420 :     of (BASE_GRP bevs) => forceBL (r, bevs @ bevs')
421 :     | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs'])
422 :     | grp => forceL (r, [grp, BASE_GRP bevs'])
423 :     (* end case *))
424 :     and forceL ([], [grp]) = grp
425 :     | forceL ([], l) = GRP l
426 :     | forceL (evt :: r, l) = (
427 :     case (force' evt, l)
428 :     of (BASE_GRP bevs, BASE_GRP bevs' :: r') =>
429 :     forceL (r, BASE_GRP(bevs @ bevs') :: r')
430 :     | (GRP grp, l) => forceL (r, grp @ l)
431 :     | (grp, l) => forceL (r, grp :: l)
432 :     (* end case *))
433 :     and force' (GUARD g) = force' (g ())
434 :     | force' (W_NACK f) = let
435 :     val cvar = R.CVAR(ref(R.CVAR_unset []))
436 :     in
437 :     NACK_GRP(cvar, force' (f (cvarGetEvt cvar)))
438 :     end
439 :     | force' (BEVT grp) = BASE_GRP grp
440 :     | force' (CHOOSE evts) = forceBL (evts, [])
441 :     in
442 :     case forceBL(evts, [])
443 :     of (BASE_GRP bevs) => syncOnBEvts bevs
444 :     | grp => syncOnGrp grp
445 :     (* end case *)
446 :     end
447 :    
448 :     end;
449 :    

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0