SCM Repository
View of /sml/trunk/src/cml/src/core-cml/event.sml
Parent Directory
|
Revision Log
Revision 9 -
(download)
(annotate)
Sun Jan 18 01:01:29 1998 UTC (23 years, 1 month ago) by monnier
File size: 13461 byte(s)
Sun Jan 18 01:01:29 1998 UTC (23 years, 1 month ago) by monnier
File size: 13461 byte(s)
This commit was generated by cvs2svn to compensate for changes in r8, which included commits to RCS files with non-trunk default branches.
(* event.sml * * COPYRIGHT (c) 1995 AT&T Bell Laboratories. * COPYRIGHT (c) 1989-1991 John H. Reppy * * The representation of event values and the event combinators. * * Some important requirements on the implementation of base event values: * * 1) The pollFn, doFn, and blockFn are always called from inside * atomic regions. * * 2) The pollFn returns an integer priority: this is 0 when not enabled, * ~1 for fixed priority, and a positive value for dynamic priority. * The standard scheme is to associate a counter with the underlying * synchronization object, and to increase it by one for each * synchronization attempt. * * 3) The blockFn is responsible for exiting the atomic region; the doFns * should NOT leave the atomic region. * * 4) The blockFn is responsible for executing the "cleanUp" action * prior to leaving the atomic region. *) structure Event : sig include EVENT val atomicCVarSet : RepTypes.cvar -> unit val cvarGetEvt : RepTypes.cvar -> unit event end = struct structure R = RepTypes structure S = Scheduler val capture = SMLofNJ.Cont.capture val escape = SMLofNJ.Cont.escape val callcc = SMLofNJ.Cont.callcc val throw = SMLofNJ.Cont.throw (* Some inline functions to improve performance *) fun map f = let fun mapf [] = [] | mapf [a] = [f a] | mapf [a, b] = [f a, f b] | mapf [a, b, c] = [f a, f b, f c] | mapf (a::b::c::d::r) = (f a)::(f b)::(f c)::(f d)::(mapf r) in mapf end fun app f = let fun appf [] = () | appf (x::r) = (f x; appf r) in appf end fun foldl f init l = let fun foldf ([], accum) = accum | foldf (x::r, accum) = foldf(r, f(x, accum)) in foldf (l, init) end fun error msg = raise Fail msg datatype event_status = datatype RepTypes.event_status type 'a base_evt = 'a RepTypes.base_evt datatype event = datatype RepTypes.event (** Condition variables. Because these variables are set inside atomic ** regions, we have to use different conventions for clean-up, etc. ** Instead of requiring the blockFn continuation to call the cleanUp ** action and to leave the atomic region, we call the cleanUp function ** when setting the condition variable (in atomicCVarSet), and have the ** invariant that the blockFn continuation is dispatched outside the ** atomic region. **) (* set a condition variable; we assume that this function is always * executed in an atomic region. *) fun atomicCVarSet (R.CVAR state) = (case !state of (R.CVAR_unset waiting) => let val R.Q{rear, ...} = S.rdyQ1 fun add [] = !rear | add ({transId=ref R.CANCEL, ...}::r) = add r | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = ( transId := R.CANCEL; cleanUp(); (tid, kont) :: (add r)) in state := R.CVAR_set 1; rear := add waiting end | _ => error "cvar already set" (* end case *)) (* the event constructor for waiting on a cvar *) fun cvarGetEvt (R.CVAR(state)) = let fun blockFn {transId, cleanUp, next} = callcc (fn k => let val (R.CVAR_unset waiting) = !state val item = {transId=transId, cleanUp=cleanUp, kont=k} in state := R.CVAR_unset(item :: waiting); next () end) fun pollFn () = (case !state of (R.CVAR_set n) => let fun doFn () = (state := R.CVAR_set 1; S.atomicEnd()) in state := R.CVAR_set(n+1); ENABLED{prio=n, doFn=doFn} end | _ => BLOCKED blockFn (* end case *)) in BEVT[pollFn] end fun alwaysEvt v = BEVT[fn () => R.ENABLED{ prio= ~1, doFn=fn () => (S.atomicEnd(); v) }] val never = BEVT[] val guard = GUARD val withNack = W_NACK fun choose (el : 'a event list) = let fun gatherBEvts ([], l) = BEVT l | gatherBEvts (BEVT[] :: r, l) = gatherBEvts (r, l) | gatherBEvts (BEVT[bev] :: r, bevs') = gatherBEvts (r, bev::bevs') | gatherBEvts (BEVT bevs :: r, bevs') = gatherBEvts (r, bevs @ bevs') | gatherBEvts (evts, []) = gather (evts, []) | gatherBEvts (evts, l) = gather (evts, [BEVT l]) and gather ([], [evt]) = evt | gather ([], evts) = CHOOSE evts | gather (CHOOSE evts :: r, evts') = gather (r, evts @ evts') | gather (BEVT bevs :: r, BEVT bevs' :: r') = gather (r, BEVT(bevs @ bevs') :: r') | gather (evt :: r, evts') = gather (r, evt :: evts') in gatherBEvts (rev el, []) end fun wrap (evt, wfn) = let fun wrapBaseEvt pollFn () = (case pollFn() of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wfn o doFn} | (BLOCKED blockFn) => BLOCKED(wfn o blockFn) (* end case *)) fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | wrap' (GUARD g) = GUARD(fn () => wrap(g(), wfn)) | wrap' (W_NACK f) = W_NACK(fn evt => wrap(f evt, wfn)) in wrap' evt end fun wrapHandler (evt, hfn) = let fun wrap f x = ((f x) handle exn => hfn exn) fun wrapBaseEvt pollFn () = (case pollFn() of ENABLED{prio, doFn} => ENABLED{prio=prio, doFn = wrap doFn} | (BLOCKED blockFn) => BLOCKED(wrap blockFn) (* end case *)) fun wrap' (BEVT bevs) = BEVT(map wrapBaseEvt bevs) | wrap' (CHOOSE evts) = CHOOSE(map wrap' evts) | wrap' (GUARD g) = GUARD(fn () => wrapHandler(g(), hfn)) | wrap' (W_NACK f) = W_NACK(fn evt => wrapHandler(f evt, hfn)) in wrap' evt end datatype 'a event_group = BASE_GRP of 'a base_evt list | GRP of 'a event_group list | NACK_GRP of (R.cvar * 'a event_group) (* force the evaluation of any guards in an event group. *) fun force (BEVT l) = BASE_GRP l | force evt = let fun force' (GUARD g) = force' (g ()) | force' (W_NACK f) = let val cvar = R.CVAR(ref(R.CVAR_unset [])) in NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) end | force' (BEVT grp) = BASE_GRP grp | force' (CHOOSE evts) = let fun forceBL ([], bevs) = BASE_GRP bevs | forceBL (evt::r, bevs') = (case (force' evt) of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | grp => forceL (r, [grp, BASE_GRP bevs']) (* end case *)) and forceL ([], [grp]) = grp | forceL ([], l) = GRP l | forceL (evt :: r, l) = ( case (force' evt, l) of (BASE_GRP bevs, BASE_GRP bevs' :: r') => forceL (r, BASE_GRP(bevs @ bevs') :: r') | (GRP grp, l) => forceL (r, grp @ l) | (grp, l) => forceL (r, grp :: l) (* end case *)) in forceBL (evts, []) end in force' evt end local val cnt = ref 0 fun random i = let val j = !cnt in if (j = 1000000) then cnt := 0 else cnt := j+1; Int.rem(j, i) end in fun selectDoFn ([(_, doFn)], _) = doFn | selectDoFn (l, n) = let fun priority ~1 = n | priority p = p fun max ((p, doFn)::r, maxP, k, doFns) = let val p = priority p in if (p > maxP) then max(r, p, 1, [doFn]) else if (p = maxP) then max(r, maxP, k+1, doFn::doFns) else max(r, maxP, k, doFns) end | max ([], _, k, [doFn]) = doFn | max ([], _, k, doFns) = List.nth(doFns, random k) in max (l, 0, 0, []) end end fun mkFlg () = let val flg = ref(R.TRANS(S.getCurThread())) in (flg, fn () => flg := R.CANCEL) end fun syncOnOneEvt (pollFn : 'a base_evt) = ( S.atomicBegin (); case (pollFn()) of ENABLED{doFn, ...} => doFn() | (BLOCKED blockFn) => let val (flg, setFlg) = mkFlg() in blockFn{transId=flg, cleanUp=setFlg, next=S.atomicDispatch} end (* end case *)) (* this function handles the case of synchronizing on a list of * base events (w/o any negative acknowledgements). It also handles * the case of synchronizing on NEVER. *) fun syncOnBEvts [] = S.dispatch() | syncOnBEvts [bev] = syncOnOneEvt bev | syncOnBEvts bevs = let fun ext ([], blockFns) = capture (fn k => let val escape = escape k val (transId, setFlg) = mkFlg() fun log [] = S.atomicDispatch () | log (blockFn :: r) = escape (blockFn { transId = transId, cleanUp = setFlg, next = fn () => log r }) in log blockFns; error "[log]" end) | ext (pollFn :: r, blockFns) = (case pollFn() of ENABLED{prio, doFn} => extRdy (r, [(prio, doFn)], 1) | (BLOCKED blockFn) => ext (r, blockFn::blockFns) (* end case *)) (** NOTE: maybe we should just keep track of the max priority? ** What about fairness to fixed priority events (e.g., always, timeout?) **) and extRdy ([], doFns, n) = selectDoFn (doFns, n) () | extRdy (pollFn :: r, doFns, n) = (case pollFn() of ENABLED{prio, doFn} => extRdy (r, (prio, doFn)::doFns, n+1) | _ => extRdy (r, doFns, n) (* end case *)) in S.atomicBegin(); ext (bevs, []) end (* walk the event group tree, collecting the base events (with associated * ack flags), and a list of (cvar * ack flag set) pairs. *) fun collect grp = let val unWrappedFlg = ref false fun gatherWrapped (grp, bl, flgSets) = let fun gather (BASE_GRP bevs, bl, allFlgs, flgSets) = let fun append ([], bl, allFlgs) = (bl, allFlgs) | append (bev::r, bl, allFlgs) = let val flg = ref false in append (r, (bev, flg)::bl, flg::allFlgs) end val (bl', allFlgs') = append (bevs, [], allFlgs) in (bl', allFlgs', flgSets) end | gather (GRP grp, bl, allFlgs, flgSets) = let fun f (grp', (bl', allFlgs', flgSets')) = gather (grp', bl', allFlgs', flgSets') in foldl f (bl, allFlgs, flgSets) grp end | gather (NACK_GRP(cvar, grp), bl, allFlgs, flgSets) = let val (bl', allFlgs', flgSets') = gather (grp, bl, allFlgs, flgSets) in (bl', allFlgs', (cvar, allFlgs') :: flgSets') end val (bl, _, flgSets) = gather (grp, bl, [], flgSets) in (bl, flgSets) end in case grp of (GRP _) => let val unWrappedFlg = ref false fun append ([], bl) = bl | append (bev::r, bl) = append(r, (bev, unWrappedFlg)::bl) fun gather (BASE_GRP bevs, bl, flgSets) = (append(bevs, bl), flgSets) | gather (GRP grp, bl, flgSets) = let fun f (grp', (bl', flgSets')) = gather(grp', bl', flgSets') in foldl f (bl, flgSets) grp end | gather (grp as NACK_GRP _, bl, flgSets) = gatherWrapped (grp, bl, flgSets) in gather (grp, [], []) end | grp => gatherWrapped (grp, [], []) (* end case *) end (* this function handles the more complicated case of synchronization * on groups of events where negative acknowledgements are involved. *) fun syncOnGrp grp = let val (bl, flgSets) = collect grp fun chkCVars () = let fun chkCVar (cvar, flgs) = let fun chkFlgs [] = () | chkFlgs ((ref true)::_) = atomicCVarSet cvar | chkFlgs (_::r) = chkFlgs r in chkFlgs flgs end in app chkCVar flgSets end fun ext ([], blockFns) = capture (fn k => let val escape = escape k val transId = ref(R.TRANS(S.getCurThread())) fun setFlg flg () = ( transId := R.CANCEL; flg := true; chkCVars()) fun log [] = S.atomicDispatch () | log ((blockFn, flg) :: r) = escape (blockFn { transId = transId, cleanUp = setFlg flg, next = fn () => log r }) in log blockFns; error "[log]" end) | ext ((pollFn, flg) :: r, blockFns) = (case pollFn() of ENABLED{prio, doFn} => extRdy (r, [(prio, (doFn, flg))], 1) | (BLOCKED blockFn) => ext (r, (blockFn, flg)::blockFns) (* end case *)) (** NOTE: maybe we should just keep track of the max priority? ** What about fairness to fixed priority events (e.g., always, timeout?) **) and extRdy ([], doFns, n) = let val (doFn, flg) = selectDoFn (doFns, n) in flg := true; chkCVars (); doFn() end | extRdy ((pollFn, flg) :: r, doFns, n) = (case pollFn() of ENABLED{prio, doFn} => extRdy (r, (prio, (doFn, flg))::doFns, n+1) | _ => extRdy (r, doFns, n) (* end case *)) in case bl of [(bev, _)] => syncOnOneEvt bev | _ => (S.atomicBegin(); ext (bl, [])) (* end case *) end fun sync ev = (case (force ev) of (BASE_GRP bevs) => syncOnBEvts bevs | grp => syncOnGrp grp (* end case *)) fun select evts = let fun forceBL ([], bevs) = BASE_GRP bevs | forceBL (evt::r, bevs') = (case (force' evt) of (BASE_GRP bevs) => forceBL (r, bevs @ bevs') | (GRP grp) => forceL (r, grp @ [BASE_GRP bevs']) | grp => forceL (r, [grp, BASE_GRP bevs']) (* end case *)) and forceL ([], [grp]) = grp | forceL ([], l) = GRP l | forceL (evt :: r, l) = ( case (force' evt, l) of (BASE_GRP bevs, BASE_GRP bevs' :: r') => forceL (r, BASE_GRP(bevs @ bevs') :: r') | (GRP grp, l) => forceL (r, grp @ l) | (grp, l) => forceL (r, grp :: l) (* end case *)) and force' (GUARD g) = force' (g ()) | force' (W_NACK f) = let val cvar = R.CVAR(ref(R.CVAR_unset [])) in NACK_GRP(cvar, force' (f (cvarGetEvt cvar))) end | force' (BEVT grp) = BASE_GRP grp | force' (CHOOSE evts) = forceBL (evts, []) in case forceBL(evts, []) of (BASE_GRP bevs) => syncOnBEvts bevs | grp => syncOnGrp grp (* end case *) end end;
root@smlnj-gforge.cs.uchicago.edu | ViewVC Help |
Powered by ViewVC 1.0.0 |