SCM Repository
Annotation of /sml/trunk/src/cml/src/core-cml/event.sml
Parent Directory
|
Revision Log
Revision 2 - (view) (download)
1 : | monnier | 2 | (* event.sml |
2 : | * | ||
3 : | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | ||
4 : | * COPYRIGHT (c) 1989-1991 John H. Reppy | ||
5 : | * | ||
6 : | * The representation of event values and the event combinators. | ||
7 : | * | ||
8 : | * Some important requirements on the implementation of base event values: | ||
9 : | * | ||
10 : | * 1) The pollFn, doFn, and blockFn are always called from inside | ||
11 : | * atomic regions. | ||
12 : | * | ||
13 : | * 2) The pollFn returns an integer priority: this is 0 when not enabled, | ||
14 : | * ~1 for fixed priority, and a positive value for dynamic priority. | ||
15 : | * The standard scheme is to associate a counter with the underlying | ||
16 : | * synchronization object, and to increase it by one for each | ||
17 : | * synchronization attempt. | ||
18 : | * | ||
19 : | * 3) The blockFn is responsible for exiting the atomic region; the doFns | ||
20 : | * should NOT leave the atomic region. | ||
21 : | * | ||
22 : | * 4) The blockFn is responsible for executing the "cleanUp" action | ||
23 : | * prior to leaving the atomic region. | ||
24 : | *) | ||
25 : | |||
26 : | structure Event : sig | ||
27 : | |||
28 : | include EVENT | ||
29 : | |||
30 : | val atomicCVarSet : RepTypes.cvar -> unit | ||
31 : | val cvarGetEvt : RepTypes.cvar -> unit event | ||
32 : | |||
33 : | end = struct | ||
34 : | |||
35 : | structure R = RepTypes | ||
36 : | structure S = Scheduler | ||
37 : | |||
38 : | val capture = SMLofNJ.Cont.capture | ||
39 : | val escape = SMLofNJ.Cont.escape | ||
40 : | val callcc = SMLofNJ.Cont.callcc | ||
41 : | val throw = SMLofNJ.Cont.throw | ||
42 : | |||
43 : | (* Some inline functions to improve performance *) | ||
44 : | fun map f = let | ||
45 : | fun mapf [] = [] | ||
46 : | | mapf [a] = [f a] | ||
47 : | | mapf [a, b] = [f a, f b] | ||
48 : | | mapf [a, b, c] = [f a, f b, f c] | ||
49 : | | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r) | ||
50 : | in | ||
51 : | mapf | ||
52 : | end | ||
53 : | fun app f = let | ||
54 : | fun appf [] = () | ||
55 : | | appf (x::r) = (f x; appf r) | ||
56 : | in | ||
57 : | appf | ||
58 : | end | ||
59 : | fun foldl f init l = let | ||
60 : | fun foldf ([], accum) = accum | ||
61 : | | foldf (x::r, accum) = foldf(r, f(x, accum)) | ||
62 : | in | ||
63 : | foldf (l, init) | ||
64 : | end | ||
65 : | |||
66 : | fun error msg = raise Fail msg | ||
67 : | |||
68 : | structure Rep : sig | ||
69 : | datatype 'a event_status | ||
70 : | = ENABLED of {prio : int, doFn : unit -> 'a} | ||
71 : | | BLOCKED of { | ||
72 : | transId : R.trans_id ref, cleanUp : unit -> unit, next : unit -> unit | ||
73 : | } -> 'a | ||
74 : | type 'a base_evt (* = unit -> 'a event_status *) | ||
75 : | datatype 'a event | ||
76 : | = BEVT of 'a base_evt list | ||
77 : | | CHOOSE of 'a event list | ||
78 : | | GUARD of unit -> 'a event | ||
79 : | | W_NACK of unit event -> 'a event | ||
80 : | end = RepTypes | ||
81 : | open Rep | ||
82 : | |||
83 : | |||
84 : | (** Condition variables. Because these variables are set inside atomic | ||
85 : | ** regions, we have to use different conventions for clean-up, etc. | ||
86 : | ** Instead of requiring the blockFn continuation to call the cleanUp | ||
87 : | ** action and to leave the atomic region, we call the cleanUp function | ||
88 : | ** when setting the condition variable (in atomicCVarSet), and have the | ||
89 : | ** invariant that the blockFn continuation is dispatched outside the | ||
90 : | ** atomic region. | ||
91 : | **) | ||
92 : | |||
93 : | (* set a condition variable; we assume that this function is always | ||
94 : | * executed in an atomic region. | ||
95 : | *) | ||
96 : | fun atomicCVarSet (R.CVAR(ref(R.CVAR_unset waiting))) = let | ||
97 : | val R.Q{rear, ...} = S.rdyQ1 | ||
98 : | fun add [] = !rear | ||
99 : | | add ({transId=ref R.CANCEL, ...}::r) = add r | ||
100 : | | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = ( | ||
101 : | transId := R.CANCEL; | ||
102 : | cleanUp(); | ||
103 : | (tid, kont) :: (add r)) | ||
104 : | in | ||
105 : | rear := add waiting | ||
106 : | end | ||
107 : | | atomicCVarSet (R.CVAR _) = error "cvar already set" | ||
108 : | |||
109 : | (* the event constructor for waiting on a cvar *) | ||
110 : | fun cvarGetEvt (R.CVAR(state)) = let | ||
111 : | fun blockFn {transId, cleanUp, next} = callcc (fn k => let | ||
112 : | val (R.CVAR_unset waiting) = !state | ||
113 : | val item = {transId=transId, cleanUp=cleanUp, kont=k} | ||
114 : | in | ||
115 : | state := R.CVAR_unset(item :: waiting); | ||
116 : | next () | ||
117 : | end) | ||
118 : | fun pollFn () = (case !state | ||
119 : | of (R.CVAR_set n) => ( | ||
120 : | state := R.CVAR_set(n+1); | ||
121 : | ENABLED{prio=n, doFn=S.atomicEnd}) | ||
122 : | | _ => BLOCKED blockFn | ||
123 : | (* end case *)) | ||
124 : | in | ||
125 : | BEVT[pollFn] | ||
126 : | end | ||
127 : | |||
128 : | |||
129 : | fun alwaysEvt v = BEVT[fn () => R.ENABLED{ | ||
130 : | prio= ~1, doFn=fn () => (S.atomicEnd(); v) | ||
131 : | }] | ||
132 : | |||
133 : | val never = BEVT[] | ||
134 : | |||
135 : | val guard = GUARD | ||
136 : | |||
137 : | val withNack = W_NACK | ||
138 : | |||
139 : | fun choose (el : 'a event list) = let | ||
140 : | fun gatherBEvts ([], l) = BEVT l | ||
141 : | | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l) | ||
142 : | | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs') | ||
143 : | | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs') | ||
144 : | | gatherBEvts (evts, []) = gather (evts, []) | ||
145 : | | gatherBEvts (evts, l) = gather (evts, [BEVT l]) | ||
146 : | and gather ([], [evt]) = evt | ||
147 : | | gather ([], evts) = CHOOSE evts | ||
148 : | | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts') | ||
149 : | | gather (BEVT bevs :: r, BEVT bevs' :: r') | ||
150 : | = gather (r, BEVT(bevs @ bevs') :: r') | ||
151 : | | gather (evt :: r, evts') = gather (r, evt :: evts') | ||
152 : | in | ||
153 : | gatherBEvts (rev el, []) | ||
154 : | end | ||
155 : | |||
156 : | fun wrap (evt, wfn) = let | ||
157 : | fun wrapBaseEvt pollFn () = (case pollFn() | ||
158 : | of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn} | ||
159 : | | (BLOCKED blockFn) => BLOCKED(wfn o blockFn) | ||
160 : | (* end case *)) | ||
161 : | fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | ||
162 : | | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | ||
163 : | | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn)) | ||
164 : | | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn)) | ||
165 : | in | ||
166 : | wrap' evt | ||
167 : | end | ||
168 : | |||
169 : | fun wrapHandler (evt, hfn) = let | ||
170 : | fun wrap f x = ((f x) handle exn => hfn exn) | ||
171 : | fun wrapBaseEvt pollFn () = (case pollFn() | ||
172 : | of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn} | ||
173 : | | (BLOCKED blockFn) => BLOCKED(wrap blockFn) | ||
174 : | (* end case *)) | ||
175 : | fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | ||
176 : | | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | ||
177 : | | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn)) | ||
178 : | | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn)) | ||
179 : | in | ||
180 : | wrap' evt | ||
181 : | end | ||
182 : | |||
183 : | datatype 'a event_group | ||
184 : | = BASE_GRP of 'a base_evt list | ||
185 : | | GRP of 'a event_group list | ||
186 : | | NACK_GRP of (R.cvar * 'a event_group) | ||
187 : | |||
188 : | (* force the evaluation of any guards in an event group. *) | ||
189 : | fun force (BEVT l) = BASE_GRP l | ||
190 : | | force evt = let | ||
191 : | fun force' (GUARD g) = force' (g ()) | ||
192 : | | force' (W_NACK f) = let | ||
193 : | val cvar = R.CVAR(ref(R.CVAR_unset [])) | ||
194 : | in | ||
195 : | NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) | ||
196 : | end | ||
197 : | | force' (BEVT grp) = BASE_GRP grp | ||
198 : | | force' (CHOOSE evts) = let | ||
199 : | fun forceBL ([], bevs) = BASE_GRP bevs | ||
200 : | | forceBL (evt::r, bevs') = (case (force' evt) | ||
201 : | of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | ||
202 : | | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | ||
203 : | | grp => forceL (r, [grp, BASE_GRP bevs']) | ||
204 : | (* end case *)) | ||
205 : | and forceL ([], [grp]) = grp | ||
206 : | | forceL ([], l) = GRP l | ||
207 : | | forceL (evt :: r, l) = ( | ||
208 : | case (force' evt, l) | ||
209 : | of (BASE_GRP bevs, BASE_GRP bevs' :: r') => | ||
210 : | forceL (r, BASE_GRP(bevs @ bevs') :: r') | ||
211 : | | (GRP grp, l) => forceL (r, grp @ l) | ||
212 : | | (grp, l) => forceL (r, grp :: l) | ||
213 : | (* end case *)) | ||
214 : | in | ||
215 : | forceBL (evts, []) | ||
216 : | end | ||
217 : | in | ||
218 : | force' evt | ||
219 : | end | ||
220 : | |||
221 : | local | ||
222 : | val cnt = ref 0 | ||
223 : | fun random i = let val j = !cnt | ||
224 : | in | ||
225 : | if (j = 1000000) then cnt := 0 else cnt := j+1; | ||
226 : | Int.rem(j, i) | ||
227 : | end | ||
228 : | in | ||
229 : | fun selectDoFn ([(_, doFn)], _) = doFn | ||
230 : | | selectDoFn (l, n) = let | ||
231 : | fun priority ~1 = n | ||
232 : | | priority p = p | ||
233 : | fun max ((p, doFn)::r, maxP, k, doFns) = let | ||
234 : | val p = priority p | ||
235 : | in | ||
236 : | if (p > maxP) then max(r, p, 1, [doFn]) | ||
237 : | else if (p = maxP) then max(r, maxP, k+1, doFn::doFns) | ||
238 : | else max(r, maxP, k, doFns) | ||
239 : | end | ||
240 : | | max ([], _, k, [doFn]) = doFn | ||
241 : | | max ([], _, k, doFns) = List.nth(doFns, random k) | ||
242 : | in | ||
243 : | max (l, 0, 0, []) | ||
244 : | end | ||
245 : | end | ||
246 : | |||
247 : | fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread())) | ||
248 : | in | ||
249 : | (flg, fn () => flg := R.CANCEL) | ||
250 : | end | ||
251 : | |||
252 : | fun syncOnOneEvt (pollFn : 'a base_evt) = ( | ||
253 : | S.atomicBegin (); | ||
254 : | case (pollFn()) | ||
255 : | of ENABLED{doFn, ...} => doFn() | ||
256 : | | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg() | ||
257 : | in | ||
258 : | blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch} | ||
259 : | end | ||
260 : | (* end case *)) | ||
261 : | |||
262 : | (* this function handles the case of synchronizing on a list of | ||
263 : | * base events (w/o any negative acknowledgements). It also handles | ||
264 : | * the case of synchronizing on NEVER. | ||
265 : | *) | ||
266 : | fun syncOnBEvts [] = S.dispatch() | ||
267 : | | syncOnBEvts [bev] = syncOnOneEvt bev | ||
268 : | | syncOnBEvts bevs = let | ||
269 : | fun ext ([], blockFns) = capture (fn k => let | ||
270 : | val escape = escape k | ||
271 : | val (transId, setFlg) = mkFlg() | ||
272 : | fun log [] = S.atomicDispatch () | ||
273 : | | log (blockFn :: r) = | ||
274 : | escape (blockFn { | ||
275 : | transId = transId, | ||
276 : | cleanUp = setFlg, | ||
277 : | next = fn () => log r | ||
278 : | }) | ||
279 : | in | ||
280 : | log blockFns; error "[log]" | ||
281 : | end) | ||
282 : | | ext (pollFn :: r, blockFns) = (case pollFn() | ||
283 : | of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1) | ||
284 : | | (BLOCKED blockFn) => ext (r, blockFn::blockFns) | ||
285 : | (* end case *)) | ||
286 : | (** NOTE: maybe we should just keep track of the max priority? | ||
287 : | ** What about fairness to fixed priority events (e.g., always, timeout?) | ||
288 : | **) | ||
289 : | and extRdy ([], doFns, n) = selectDoFn (doFns, n) () | ||
290 : | | extRdy (pollFn :: r, doFns, n) = (case pollFn() | ||
291 : | of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1) | ||
292 : | | _ => extRdy (r, doFns, n) | ||
293 : | (* end case *)) | ||
294 : | in | ||
295 : | S.atomicBegin(); | ||
296 : | ext (bevs, []) | ||
297 : | end | ||
298 : | |||
299 : | (* walk the event group tree, collecting the base events (with associated | ||
300 : | * ack flags), and a list of (cvar * ack flag set) pairs. | ||
301 : | *) | ||
302 : | fun collect grp = let | ||
303 : | val unWrappedFlg = ref false | ||
304 : | fun gatherWrapped (grp, bl, flgSets) = let | ||
305 : | fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let | ||
306 : | fun append ([], bl, allFlgs) = (bl, allFlgs) | ||
307 : | | append (bev::r, bl, allFlgs) = let | ||
308 : | val flg = ref false | ||
309 : | in | ||
310 : | append (r, (bev, flg)::bl, flg::allFlgs) | ||
311 : | end | ||
312 : | val (bl', allFlgs') = append (bevs, [], allFlgs) | ||
313 : | in | ||
314 : | (bl', allFlgs', flgSets) | ||
315 : | end | ||
316 : | | gather (GRP grp, bl, allFlgs, flgSets) = let | ||
317 : | fun f (grp', (bl', allFlgs', flgSets')) = | ||
318 : | gather (grp', bl', allFlgs', flgSets') | ||
319 : | in | ||
320 : | foldl f (bl, allFlgs, flgSets) grp | ||
321 : | end | ||
322 : | | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let | ||
323 : | val (bl', allFlgs', flgSets') = | ||
324 : | gather (grp, bl, allFlgs, flgSets) | ||
325 : | in | ||
326 : | (bl', allFlgs', (cvar, allFlgs') :: flgSets') | ||
327 : | end | ||
328 : | val (bl, _, flgSets) = gather (grp, bl, [], flgSets) | ||
329 : | in | ||
330 : | (bl, flgSets) | ||
331 : | end | ||
332 : | in | ||
333 : | case grp | ||
334 : | of (GRP _) => let | ||
335 : | val unWrappedFlg = ref false | ||
336 : | fun append ([], bl) = bl | ||
337 : | | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl) | ||
338 : | fun gather (BASE_GRP bevs, bl, flgSets) = | ||
339 : | (append(bevs, bl), flgSets) | ||
340 : | | gather (GRP grp, bl, flgSets) = let | ||
341 : | fun f (grp', (bl', flgSets')) = | ||
342 : | gather(grp', bl', flgSets') | ||
343 : | in | ||
344 : | foldl f (bl, flgSets) grp | ||
345 : | end | ||
346 : | | gather (grp as NACK_GRP _, bl, flgSets) = | ||
347 : | gatherWrapped (grp, bl, flgSets) | ||
348 : | in | ||
349 : | gather (grp, [], []) | ||
350 : | end | ||
351 : | | grp => gatherWrapped (grp, [], []) | ||
352 : | (* end case *) | ||
353 : | end | ||
354 : | |||
355 : | (* this function handles the more complicated case of synchronization | ||
356 : | * on groups of events where negative acknowledgements are involved. | ||
357 : | *) | ||
358 : | fun syncOnGrp grp = let | ||
359 : | val (bl, flgSets) = collect grp | ||
360 : | fun chkCVars () = let | ||
361 : | fun chkCVar (cvar, flgs) = let | ||
362 : | fun chkFlgs [] = () | ||
363 : | | chkFlgs ((ref true)::_) = atomicCVarSet cvar | ||
364 : | | chkFlgs (_::r) = chkFlgs r | ||
365 : | in | ||
366 : | chkFlgs flgs | ||
367 : | end | ||
368 : | in | ||
369 : | app chkCVar flgSets | ||
370 : | end | ||
371 : | fun ext ([], blockFns) = capture (fn k => let | ||
372 : | val escape = escape k | ||
373 : | val transId = ref(R.TRANS(S.getCurThread())) | ||
374 : | fun setFlg flg () = ( | ||
375 : | transId := R.CANCEL; flg := true; chkCVars()) | ||
376 : | fun log [] = S.atomicDispatch () | ||
377 : | | log ((blockFn, flg) :: r) = | ||
378 : | escape (blockFn { | ||
379 : | transId = transId, | ||
380 : | cleanUp = setFlg flg, | ||
381 : | next = fn () => log r | ||
382 : | }) | ||
383 : | in | ||
384 : | log blockFns; error "[log]" | ||
385 : | end) | ||
386 : | | ext ((pollFn, flg) :: r, blockFns) = (case pollFn() | ||
387 : | of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1) | ||
388 : | | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns) | ||
389 : | (* end case *)) | ||
390 : | (** NOTE: maybe we should just keep track of the max priority? | ||
391 : | ** What about fairness to fixed priority events (e.g., always, timeout?) | ||
392 : | **) | ||
393 : | and extRdy ([], doFns, n) = let | ||
394 : | val (doFn, flg) = selectDoFn (doFns, n) | ||
395 : | in | ||
396 : | flg := true; | ||
397 : | chkCVars (); | ||
398 : | doFn() | ||
399 : | end | ||
400 : | | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn() | ||
401 : | of ENABLED{prio, doFn} => | ||
402 : | extRdy (r, (prio, (doFn, flg))::doFns, n+1) | ||
403 : | | _ => extRdy (r, doFns, n) | ||
404 : | (* end case *)) | ||
405 : | in | ||
406 : | case bl | ||
407 : | of [(bev, _)] => syncOnOneEvt bev | ||
408 : | | _ => (S.atomicBegin(); ext (bl, [])) | ||
409 : | (* end case *) | ||
410 : | end | ||
411 : | |||
412 : | fun sync ev = (case (force ev) | ||
413 : | of (BASE_GRP bevs) => syncOnBEvts bevs | ||
414 : | | grp => syncOnGrp grp | ||
415 : | (* end case *)) | ||
416 : | |||
417 : | fun select evts = let | ||
418 : | fun forceBL ([], bevs) = BASE_GRP bevs | ||
419 : | | forceBL (evt::r, bevs') = (case (force' evt) | ||
420 : | of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | ||
421 : | | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | ||
422 : | | grp => forceL (r, [grp, BASE_GRP bevs']) | ||
423 : | (* end case *)) | ||
424 : | and forceL ([], [grp]) = grp | ||
425 : | | forceL ([], l) = GRP l | ||
426 : | | forceL (evt :: r, l) = ( | ||
427 : | case (force' evt, l) | ||
428 : | of (BASE_GRP bevs, BASE_GRP bevs' :: r') => | ||
429 : | forceL (r, BASE_GRP(bevs @ bevs') :: r') | ||
430 : | | (GRP grp, l) => forceL (r, grp @ l) | ||
431 : | | (grp, l) => forceL (r, grp :: l) | ||
432 : | (* end case *)) | ||
433 : | and force' (GUARD g) = force' (g ()) | ||
434 : | | force' (W_NACK f) = let | ||
435 : | val cvar = R.CVAR(ref(R.CVAR_unset [])) | ||
436 : | in | ||
437 : | NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) | ||
438 : | end | ||
439 : | | force' (BEVT grp) = BASE_GRP grp | ||
440 : | | force' (CHOOSE evts) = forceBL (evts, []) | ||
441 : | in | ||
442 : | case forceBL(evts, []) | ||
443 : | of (BASE_GRP bevs) => syncOnBEvts bevs | ||
444 : | | grp => syncOnGrp grp | ||
445 : | (* end case *) | ||
446 : | end | ||
447 : | |||
448 : | end; | ||
449 : |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |