SCM Repository
Annotation of /sml/trunk/src/cml/src/core-cml/event.sml
Parent Directory
|
Revision Log
Revision 651 - (view) (download)
1 : | monnier | 2 | (* event.sml |
2 : | * | ||
3 : | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | ||
4 : | * COPYRIGHT (c) 1989-1991 John H. Reppy | ||
5 : | * | ||
6 : | * The representation of event values and the event combinators. | ||
7 : | * | ||
8 : | * Some important requirements on the implementation of base event values: | ||
9 : | * | ||
10 : | * 1) The pollFn, doFn, and blockFn are always called from inside | ||
11 : | * atomic regions. | ||
12 : | * | ||
13 : | * 2) The pollFn returns an integer priority: this is 0 when not enabled, | ||
14 : | * ~1 for fixed priority, and a positive value for dynamic priority. | ||
15 : | * The standard scheme is to associate a counter with the underlying | ||
16 : | * synchronization object, and to increase it by one for each | ||
17 : | * synchronization attempt. | ||
18 : | * | ||
19 : | * 3) The blockFn is responsible for exiting the atomic region; the doFns | ||
20 : | * should NOT leave the atomic region. | ||
21 : | * | ||
22 : | * 4) The blockFn is responsible for executing the "cleanUp" action | ||
23 : | * prior to leaving the atomic region. | ||
24 : | *) | ||
25 : | |||
26 : | structure Event : sig | ||
27 : | |||
28 : | include EVENT | ||
29 : | |||
30 : | val atomicCVarSet : RepTypes.cvar -> unit | ||
31 : | val cvarGetEvt : RepTypes.cvar -> unit event | ||
32 : | |||
33 : | end = struct | ||
34 : | |||
35 : | structure R = RepTypes | ||
36 : | structure S = Scheduler | ||
37 : | |||
38 : | val capture = SMLofNJ.Cont.capture | ||
39 : | val escape = SMLofNJ.Cont.escape | ||
40 : | val callcc = SMLofNJ.Cont.callcc | ||
41 : | val throw = SMLofNJ.Cont.throw | ||
42 : | |||
43 : | (* Some inline functions to improve performance *) | ||
44 : | fun map f = let | ||
45 : | fun mapf [] = [] | ||
46 : | | mapf [a] = [f a] | ||
47 : | | mapf [a, b] = [f a, f b] | ||
48 : | | mapf [a, b, c] = [f a, f b, f c] | ||
49 : | | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r) | ||
50 : | in | ||
51 : | mapf | ||
52 : | end | ||
53 : | fun app f = let | ||
54 : | fun appf [] = () | ||
55 : | | appf (x::r) = (f x; appf r) | ||
56 : | in | ||
57 : | appf | ||
58 : | end | ||
59 : | fun foldl f init l = let | ||
60 : | fun foldf ([], accum) = accum | ||
61 : | | foldf (x::r, accum) = foldf(r, f(x, accum)) | ||
62 : | in | ||
63 : | foldf (l, init) | ||
64 : | end | ||
65 : | |||
66 : | fun error msg = raise Fail msg | ||
67 : | |||
68 : | monnier | 8 | datatype event_status = datatype RepTypes.event_status |
69 : | type 'a base_evt = 'a RepTypes.base_evt | ||
70 : | datatype event = datatype RepTypes.event | ||
71 : | monnier | 2 | |
72 : | |||
73 : | (** Condition variables. Because these variables are set inside atomic | ||
74 : | ** regions, we have to use different conventions for clean-up, etc. | ||
75 : | ** Instead of requiring the blockFn continuation to call the cleanUp | ||
76 : | ** action and to leave the atomic region, we call the cleanUp function | ||
77 : | ** when setting the condition variable (in atomicCVarSet), and have the | ||
78 : | ** invariant that the blockFn continuation is dispatched outside the | ||
79 : | ** atomic region. | ||
80 : | **) | ||
81 : | |||
82 : | (* set a condition variable; we assume that this function is always | ||
83 : | * executed in an atomic region. | ||
84 : | *) | ||
85 : | monnier | 8 | fun atomicCVarSet (R.CVAR state) = (case !state |
86 : | of (R.CVAR_unset waiting) => let | ||
87 : | val R.Q{rear, ...} = S.rdyQ1 | ||
88 : | fun add [] = !rear | ||
89 : | | add ({transId=ref R.CANCEL, ...}::r) = add r | ||
90 : | | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = ( | ||
91 : | transId := R.CANCEL; | ||
92 : | cleanUp(); | ||
93 : | (tid, kont) :: (add r)) | ||
94 : | in | ||
95 : | state := R.CVAR_set 1; | ||
96 : | rear := add waiting | ||
97 : | end | ||
98 : | | _ => error "cvar already set" | ||
99 : | (* end case *)) | ||
100 : | monnier | 2 | |
101 : | (* the event constructor for waiting on a cvar *) | ||
102 : | fun cvarGetEvt (R.CVAR(state)) = let | ||
103 : | fun blockFn {transId, cleanUp, next} = callcc (fn k => let | ||
104 : | val (R.CVAR_unset waiting) = !state | ||
105 : | val item = {transId=transId, cleanUp=cleanUp, kont=k} | ||
106 : | in | ||
107 : | state := R.CVAR_unset(item :: waiting); | ||
108 : | next () | ||
109 : | end) | ||
110 : | fun pollFn () = (case !state | ||
111 : | monnier | 8 | of (R.CVAR_set n) => let |
112 : | fun doFn () = (state := R.CVAR_set 1; S.atomicEnd()) | ||
113 : | in | ||
114 : | state := R.CVAR_set(n+1); | ||
115 : | ENABLED{prio=n, doFn=doFn} | ||
116 : | end | ||
117 : | monnier | 2 | | _ => BLOCKED blockFn |
118 : | (* end case *)) | ||
119 : | in | ||
120 : | BEVT[pollFn] | ||
121 : | end | ||
122 : | |||
123 : | |||
124 : | fun alwaysEvt v = BEVT[fn () => R.ENABLED{ | ||
125 : | prio= ~1, doFn=fn () => (S.atomicEnd(); v) | ||
126 : | }] | ||
127 : | |||
128 : | val never = BEVT[] | ||
129 : | |||
130 : | val guard = GUARD | ||
131 : | |||
132 : | val withNack = W_NACK | ||
133 : | |||
134 : | fun choose (el : 'a event list) = let | ||
135 : | fun gatherBEvts ([], l) = BEVT l | ||
136 : | | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l) | ||
137 : | | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs') | ||
138 : | | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs') | ||
139 : | | gatherBEvts (evts, []) = gather (evts, []) | ||
140 : | | gatherBEvts (evts, l) = gather (evts, [BEVT l]) | ||
141 : | and gather ([], [evt]) = evt | ||
142 : | | gather ([], evts) = CHOOSE evts | ||
143 : | | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts') | ||
144 : | | gather (BEVT bevs :: r, BEVT bevs' :: r') | ||
145 : | = gather (r, BEVT(bevs @ bevs') :: r') | ||
146 : | | gather (evt :: r, evts') = gather (r, evt :: evts') | ||
147 : | in | ||
148 : | gatherBEvts (rev el, []) | ||
149 : | end | ||
150 : | |||
151 : | fun wrap (evt, wfn) = let | ||
152 : | fun wrapBaseEvt pollFn () = (case pollFn() | ||
153 : | of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn} | ||
154 : | | (BLOCKED blockFn) => BLOCKED(wfn o blockFn) | ||
155 : | (* end case *)) | ||
156 : | fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | ||
157 : | | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | ||
158 : | | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn)) | ||
159 : | | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn)) | ||
160 : | in | ||
161 : | wrap' evt | ||
162 : | end | ||
163 : | |||
164 : | fun wrapHandler (evt, hfn) = let | ||
165 : | fun wrap f x = ((f x) handle exn => hfn exn) | ||
166 : | fun wrapBaseEvt pollFn () = (case pollFn() | ||
167 : | of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn} | ||
168 : | | (BLOCKED blockFn) => BLOCKED(wrap blockFn) | ||
169 : | (* end case *)) | ||
170 : | fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | ||
171 : | | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | ||
172 : | | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn)) | ||
173 : | | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn)) | ||
174 : | in | ||
175 : | wrap' evt | ||
176 : | end | ||
177 : | |||
178 : | datatype 'a event_group | ||
179 : | = BASE_GRP of 'a base_evt list | ||
180 : | | GRP of 'a event_group list | ||
181 : | | NACK_GRP of (R.cvar * 'a event_group) | ||
182 : | |||
183 : | (* force the evaluation of any guards in an event group. *) | ||
184 : | fun force (BEVT l) = BASE_GRP l | ||
185 : | | force evt = let | ||
186 : | fun force' (GUARD g) = force' (g ()) | ||
187 : | | force' (W_NACK f) = let | ||
188 : | val cvar = R.CVAR(ref(R.CVAR_unset [])) | ||
189 : | in | ||
190 : | NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) | ||
191 : | end | ||
192 : | | force' (BEVT grp) = BASE_GRP grp | ||
193 : | | force' (CHOOSE evts) = let | ||
194 : | fun forceBL ([], bevs) = BASE_GRP bevs | ||
195 : | | forceBL (evt::r, bevs') = (case (force' evt) | ||
196 : | of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | ||
197 : | | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | ||
198 : | | grp => forceL (r, [grp, BASE_GRP bevs']) | ||
199 : | (* end case *)) | ||
200 : | and forceL ([], [grp]) = grp | ||
201 : | | forceL ([], l) = GRP l | ||
202 : | | forceL (evt :: r, l) = ( | ||
203 : | case (force' evt, l) | ||
204 : | of (BASE_GRP bevs, BASE_GRP bevs' :: r') => | ||
205 : | forceL (r, BASE_GRP(bevs @ bevs') :: r') | ||
206 : | | (GRP grp, l) => forceL (r, grp @ l) | ||
207 : | | (grp, l) => forceL (r, grp :: l) | ||
208 : | (* end case *)) | ||
209 : | in | ||
210 : | forceBL (evts, []) | ||
211 : | end | ||
212 : | in | ||
213 : | force' evt | ||
214 : | end | ||
215 : | |||
216 : | local | ||
217 : | val cnt = ref 0 | ||
218 : | fun random i = let val j = !cnt | ||
219 : | in | ||
220 : | if (j = 1000000) then cnt := 0 else cnt := j+1; | ||
221 : | Int.rem(j, i) | ||
222 : | end | ||
223 : | in | ||
224 : | fun selectDoFn ([(_, doFn)], _) = doFn | ||
225 : | | selectDoFn (l, n) = let | ||
226 : | fun priority ~1 = n | ||
227 : | | priority p = p | ||
228 : | fun max ((p, doFn)::r, maxP, k, doFns) = let | ||
229 : | val p = priority p | ||
230 : | in | ||
231 : | if (p > maxP) then max(r, p, 1, [doFn]) | ||
232 : | else if (p = maxP) then max(r, maxP, k+1, doFn::doFns) | ||
233 : | else max(r, maxP, k, doFns) | ||
234 : | end | ||
235 : | | max ([], _, k, [doFn]) = doFn | ||
236 : | | max ([], _, k, doFns) = List.nth(doFns, random k) | ||
237 : | in | ||
238 : | max (l, 0, 0, []) | ||
239 : | end | ||
240 : | end | ||
241 : | |||
242 : | fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread())) | ||
243 : | in | ||
244 : | (flg, fn () => flg := R.CANCEL) | ||
245 : | end | ||
246 : | |||
247 : | fun syncOnOneEvt (pollFn : 'a base_evt) = ( | ||
248 : | S.atomicBegin (); | ||
249 : | case (pollFn()) | ||
250 : | of ENABLED{doFn, ...} => doFn() | ||
251 : | | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg() | ||
252 : | in | ||
253 : | blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch} | ||
254 : | end | ||
255 : | (* end case *)) | ||
256 : | |||
257 : | (* this function handles the case of synchronizing on a list of | ||
258 : | * base events (w/o any negative acknowledgements). It also handles | ||
259 : | * the case of synchronizing on NEVER. | ||
260 : | *) | ||
261 : | fun syncOnBEvts [] = S.dispatch() | ||
262 : | | syncOnBEvts [bev] = syncOnOneEvt bev | ||
263 : | | syncOnBEvts bevs = let | ||
264 : | fun ext ([], blockFns) = capture (fn k => let | ||
265 : | val escape = escape k | ||
266 : | val (transId, setFlg) = mkFlg() | ||
267 : | fun log [] = S.atomicDispatch () | ||
268 : | | log (blockFn :: r) = | ||
269 : | escape (blockFn { | ||
270 : | transId = transId, | ||
271 : | cleanUp = setFlg, | ||
272 : | next = fn () => log r | ||
273 : | }) | ||
274 : | in | ||
275 : | log blockFns; error "[log]" | ||
276 : | end) | ||
277 : | | ext (pollFn :: r, blockFns) = (case pollFn() | ||
278 : | of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1) | ||
279 : | | (BLOCKED blockFn) => ext (r, blockFn::blockFns) | ||
280 : | (* end case *)) | ||
281 : | (** NOTE: maybe we should just keep track of the max priority? | ||
282 : | ** What about fairness to fixed priority events (e.g., always, timeout?) | ||
283 : | **) | ||
284 : | and extRdy ([], doFns, n) = selectDoFn (doFns, n) () | ||
285 : | | extRdy (pollFn :: r, doFns, n) = (case pollFn() | ||
286 : | of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1) | ||
287 : | | _ => extRdy (r, doFns, n) | ||
288 : | (* end case *)) | ||
289 : | in | ||
290 : | S.atomicBegin(); | ||
291 : | ext (bevs, []) | ||
292 : | end | ||
293 : | |||
294 : | (* walk the event group tree, collecting the base events (with associated | ||
295 : | * ack flags), and a list of (cvar * ack flag set) pairs. | ||
296 : | *) | ||
297 : | fun collect grp = let | ||
298 : | val unWrappedFlg = ref false | ||
299 : | fun gatherWrapped (grp, bl, flgSets) = let | ||
300 : | fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let | ||
301 : | fun append ([], bl, allFlgs) = (bl, allFlgs) | ||
302 : | | append (bev::r, bl, allFlgs) = let | ||
303 : | val flg = ref false | ||
304 : | in | ||
305 : | append (r, (bev, flg)::bl, flg::allFlgs) | ||
306 : | end | ||
307 : | val (bl', allFlgs') = append (bevs, [], allFlgs) | ||
308 : | in | ||
309 : | (bl', allFlgs', flgSets) | ||
310 : | end | ||
311 : | | gather (GRP grp, bl, allFlgs, flgSets) = let | ||
312 : | fun f (grp', (bl', allFlgs', flgSets')) = | ||
313 : | gather (grp', bl', allFlgs', flgSets') | ||
314 : | in | ||
315 : | foldl f (bl, allFlgs, flgSets) grp | ||
316 : | end | ||
317 : | | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let | ||
318 : | val (bl', allFlgs', flgSets') = | ||
319 : | gather (grp, bl, allFlgs, flgSets) | ||
320 : | in | ||
321 : | (bl', allFlgs', (cvar, allFlgs') :: flgSets') | ||
322 : | end | ||
323 : | val (bl, _, flgSets) = gather (grp, bl, [], flgSets) | ||
324 : | in | ||
325 : | (bl, flgSets) | ||
326 : | end | ||
327 : | in | ||
328 : | case grp | ||
329 : | of (GRP _) => let | ||
330 : | val unWrappedFlg = ref false | ||
331 : | fun append ([], bl) = bl | ||
332 : | | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl) | ||
333 : | fun gather (BASE_GRP bevs, bl, flgSets) = | ||
334 : | (append(bevs, bl), flgSets) | ||
335 : | | gather (GRP grp, bl, flgSets) = let | ||
336 : | fun f (grp', (bl', flgSets')) = | ||
337 : | gather(grp', bl', flgSets') | ||
338 : | in | ||
339 : | foldl f (bl, flgSets) grp | ||
340 : | end | ||
341 : | | gather (grp as NACK_GRP _, bl, flgSets) = | ||
342 : | gatherWrapped (grp, bl, flgSets) | ||
343 : | in | ||
344 : | gather (grp, [], []) | ||
345 : | end | ||
346 : | | grp => gatherWrapped (grp, [], []) | ||
347 : | (* end case *) | ||
348 : | end | ||
349 : | |||
350 : | (* this function handles the more complicated case of synchronization | ||
351 : | * on groups of events where negative acknowledgements are involved. | ||
352 : | *) | ||
353 : | fun syncOnGrp grp = let | ||
354 : | val (bl, flgSets) = collect grp | ||
355 : | fun chkCVars () = let | ||
356 : | fun chkCVar (cvar, flgs) = let | ||
357 : | fun chkFlgs [] = () | ||
358 : | | chkFlgs ((ref true)::_) = atomicCVarSet cvar | ||
359 : | | chkFlgs (_::r) = chkFlgs r | ||
360 : | in | ||
361 : | chkFlgs flgs | ||
362 : | end | ||
363 : | in | ||
364 : | app chkCVar flgSets | ||
365 : | end | ||
366 : | fun ext ([], blockFns) = capture (fn k => let | ||
367 : | val escape = escape k | ||
368 : | val transId = ref(R.TRANS(S.getCurThread())) | ||
369 : | fun setFlg flg () = ( | ||
370 : | transId := R.CANCEL; flg := true; chkCVars()) | ||
371 : | fun log [] = S.atomicDispatch () | ||
372 : | | log ((blockFn, flg) :: r) = | ||
373 : | escape (blockFn { | ||
374 : | transId = transId, | ||
375 : | cleanUp = setFlg flg, | ||
376 : | next = fn () => log r | ||
377 : | }) | ||
378 : | in | ||
379 : | log blockFns; error "[log]" | ||
380 : | end) | ||
381 : | | ext ((pollFn, flg) :: r, blockFns) = (case pollFn() | ||
382 : | of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1) | ||
383 : | | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns) | ||
384 : | (* end case *)) | ||
385 : | (** NOTE: maybe we should just keep track of the max priority? | ||
386 : | ** What about fairness to fixed priority events (e.g., always, timeout?) | ||
387 : | **) | ||
388 : | and extRdy ([], doFns, n) = let | ||
389 : | val (doFn, flg) = selectDoFn (doFns, n) | ||
390 : | in | ||
391 : | flg := true; | ||
392 : | chkCVars (); | ||
393 : | doFn() | ||
394 : | end | ||
395 : | | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn() | ||
396 : | of ENABLED{prio, doFn} => | ||
397 : | extRdy (r, (prio, (doFn, flg))::doFns, n+1) | ||
398 : | | _ => extRdy (r, doFns, n) | ||
399 : | (* end case *)) | ||
400 : | in | ||
401 : | case bl | ||
402 : | of [(bev, _)] => syncOnOneEvt bev | ||
403 : | | _ => (S.atomicBegin(); ext (bl, [])) | ||
404 : | (* end case *) | ||
405 : | end | ||
406 : | |||
407 : | fun sync ev = (case (force ev) | ||
408 : | of (BASE_GRP bevs) => syncOnBEvts bevs | ||
409 : | | grp => syncOnGrp grp | ||
410 : | (* end case *)) | ||
411 : | |||
412 : | fun select evts = let | ||
413 : | fun forceBL ([], bevs) = BASE_GRP bevs | ||
414 : | | forceBL (evt::r, bevs') = (case (force' evt) | ||
415 : | of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | ||
416 : | | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | ||
417 : | | grp => forceL (r, [grp, BASE_GRP bevs']) | ||
418 : | (* end case *)) | ||
419 : | and forceL ([], [grp]) = grp | ||
420 : | | forceL ([], l) = GRP l | ||
421 : | | forceL (evt :: r, l) = ( | ||
422 : | case (force' evt, l) | ||
423 : | of (BASE_GRP bevs, BASE_GRP bevs' :: r') => | ||
424 : | forceL (r, BASE_GRP(bevs @ bevs') :: r') | ||
425 : | | (GRP grp, l) => forceL (r, grp @ l) | ||
426 : | | (grp, l) => forceL (r, grp :: l) | ||
427 : | (* end case *)) | ||
428 : | and force' (GUARD g) = force' (g ()) | ||
429 : | | force' (W_NACK f) = let | ||
430 : | val cvar = R.CVAR(ref(R.CVAR_unset [])) | ||
431 : | in | ||
432 : | NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) | ||
433 : | end | ||
434 : | | force' (BEVT grp) = BASE_GRP grp | ||
435 : | | force' (CHOOSE evts) = forceBL (evts, []) | ||
436 : | in | ||
437 : | case forceBL(evts, []) | ||
438 : | of (BASE_GRP bevs) => syncOnBEvts bevs | ||
439 : | | grp => syncOnGrp grp | ||
440 : | (* end case *) | ||
441 : | end | ||
442 : | |||
443 : | end; | ||
444 : |
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |