Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] View of /sml/trunk/benchmarks/todo/cml-sieve/concur.sml
ViewVC logotype

View of /sml/trunk/benchmarks/todo/cml-sieve/concur.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 193 - (download) (annotate)
Fri Nov 20 17:43:59 1998 UTC (22 years, 10 months ago) by monnier
File size: 30782 byte(s)
Initial revision
(* concur.sml
 *
 * COPYRIGHT (c) 1990 by John H. Reppy.  See COPYRIGHT file for details.
 *)

functor ConcurML () : INTERNAL_CML =
  struct

  (* we must use the fully polymorphic versions of callcc, etc. *)
    open SMLofNJ.Cont

    val versionName = "Concurrent ML, version 0.9.6, October 11, 1991"

  (* some utility functions that should be inlined *)
    fun reverse ([], rl) = rl
      | reverse (x :: rest, rl) = reverse(rest, x :: rl)
    fun op o (f, g) = fn x => f(g x)
    fun map f l= let
	fun map' ([], l) = reverse(l, [])
	  | map' (x::r, l) = map'(r, (f x)::l)
	in
	  map' (l, [])
	end
    fun revmap f l= let
	fun map' ([], l) = l
	  | map' (x::r, l) = map'(r, (f x)::l)
	in
	  map' (reverse(l, []),  [])
	end
    fun a @ [] = a
      | a @ b = let
	  fun append ([], l) = reverse(l, b)
	    | append (x::r, l) = append(r, x::l)
	  in
	    append(a, [])
	  end
    fun op before (x, y) = x

  (* queues *)
    datatype 'a queue_t = Q of {front : 'a list ref, rear : 'a list ref}
  (* create a new queue *)
    fun queueNew () = Q{front = ref [], rear = ref []}
  (* queue insert *)
    fun queueInsc (Q{rear, ...}) = fn x => (rear := x :: !rear)
    fun queueIns ((Q{rear, ...}), x) = (rear := x :: !rear)
  (* remove the head of the queue *)
    exception EmptyQ
    fun queueRem (Q{front = ref [], rear = ref []}) = raise EmptyQ
      | queueRem (Q{front = front as (ref []), rear = rear as (ref l)}) = let
	  val (x::r) = reverse(l, [])
	  in
	    front := r; rear := []; x
	  end
      | queueRem (Q{front = front as (ref(x::r)), ...}) = (front := r; x)

  (* report an internal error to std_err *)
    fun reportError msg = let
	  val s = if (ord(msg) = ord("\n"))
	  	then concat["\nCML: ", substring(msg, 1, (size msg)-1), "\n"]
		else concat["CML: ", msg, "\n"]
	  in
	    System.Signals.maskSignals true;
	    IO.output(IO.std_err, s);
	    System.Signals.maskSignals false
	  end

    exception InternalError
    fun error s = (reportError("\nINTERNAL ERROR: "^s); raise InternalError)

  (* timers *)
    structure T : sig
	val earlier : Time.time * Time.time -> bool
	val add_time : Time.time * Time.time -> time
	val zeroTime : Time.time
        val currentTime : unit -> Time.time
	val timerOff : unit -> unit
	val timerOn : Time.time option -> unit
	val restartTimer : unit -> unit
      end = struct
	val earlier = Time.<
	val add_time = Time.+
	val zeroTime = Time.zeroTime
	fun currentTime () = Time.now
        val saveTime = ref (NONE : time option)
	fun timerOff () = setitimer (0, zeroTime, zeroTime)
	fun timerOn t = (
	      saveTime := t;
	      case t of (SOME tq) => setitimer (0, tq, tq) | _ => ())
	fun restartTimer () = 
	      case !saveTime of (SOME tq) => setitimer (0, tq, tq) | _ => ()
      end
    open T


  (* the termination function *)
    val shutdown = ref (fn () => ())


  (* Per-thread descriptors *)
    datatype thread_id = TID of {  (* thread ids *)
	id	   : int,
	death_cond : unit cond_var
      }
  (* condition variables *)
    and 'a cond_var = COND of 'a cond_state ref
    and 'a cond_state
      = COND_unset of (thread_id * bool ref * 'a cont) list
      | COND_set of 'a

  (* channels *)
    datatype 'a chan = CHAN of {
	inq	: (thread_id * 'a cont) chanq,
	outq	: (thread_id * 'a * unit cont) chanq
      }
      withtype 'a chanq = (bool ref * 'a) queue_t

  (* events *)
    datatype abort_fn = NO_ABORT | ABORT of (unit -> unit)
    datatype 'a base_evt = BASE_EVT of {
	  pollfn : unit -> bool,
	  dofn : abort_fn -> 'a,
	  blockfn : (bool ref * abort_fn * (unit -> unit)) -> 'a,
	  abortfn : abort_fn
	}
    datatype 'a event
      = EVT of ('a base_evt list * bool)    (* the boolean is true if one of the *)
					    (* base events has an abort action *)
      | GUARD of (unit -> 'a event)

    fun mkBaseEvt arg = EVT([BASE_EVT arg], false)

    fun applyAbortFn NO_ABORT = ()
      | applyAbortFn (ABORT a) = a()

  (** Thread id creation **)
    val nextId = ref 0
    fun newId () = let val id = !nextId
	  in
	    nextId := id + 1;
	    TID{id = id, death_cond = COND(ref(COND_unset[]))}
	  end
  (* the current thread is represented using the "var" register *)
    val getCurThread : unit -> thread_id = System.Unsafe.getvar
    val setCurThread : thread_id -> unit = System.Unsafe.setvar
    val getTid = getCurThread

  (* the thread ready queue *)
    val rdyQ : (thread_id * unit cont) queue_t = queueNew()
  (* enqueue a ready thread *)
    val enqueue = queueInsc rdyQ
  (* enqueue the current thread *)
    fun enqueueCurThread resume = (enqueue(getTid(), resume))
  (* add the current thread to the ready queue, and return the next one *)
    fun switchCurThread k = let val curP = (getTid(), k)
	  in
	    case rdyQ
	     of Q{front=ref [], rear=ref []} => curP
	      | Q{front=front as (ref []), rear} => let
		  val (x::r) = reverse(!rear, [curP])
		  in
		    front := r; rear := []; x
		  end
	      | Q{front=front as ref (x::r), rear} => (
		  front := r; rear := (curP :: !rear); x)
	  end


  (** I/O wait queues **)
    fun pollFDs (rdfds, wrfds, blocking) = let
	  val t = if blocking then NONE else (SOME zeroTime)
	  val (rd, wr, _) = System.Unsafe.SysIO.select(rdfds, wrfds, [], t)
	  in
	    (rd, wr)
	  end
    (* The list of I/O wait events *)
      datatype io_operation_t = IO_RD | IO_WR
      type io_item = {
	  fd	   : int,		(* the file descriptor *)
	  io_op	   : io_operation_t,	(* the operation being waited for *)
	  kont	   : unit cont,		(* the synchronization continuation and *)
	  id	   : thread_id,		(* the id of the waiting thread *)
	  err_kont : unit cont,		(* the error continuation of the thread *)
	  dirty	   : bool ref		(* the dirty bit *)
	}
      val ioWaitList = ref ([] : io_item list)

    (* project the list of read fds and list of write fds from the I/O wait list. *)
      fun projIO () = let
	    fun f ([] : io_item list, rd, wr) = (rd, wr)
	      | f ({dirty = ref true, ...}::r, rd, wr) = f(r, rd, wr)
	      | f ({io_op = IO_RD, fd, ...}::r, rd, wr) = f(r, fd::rd, wr)
	      | f ({io_op = IO_WR, fd, ...}::r, rd, wr) = f(r, rd, fd::wr)
	    in
	      f(!ioWaitList, [], [])
	    end

    (* check for available I/O operations *)
      fun checkIO shouldBlock = (case projIO()
	     of ([], []) => ()
	      | (rd, wr) => (case pollFDs(rd, wr, shouldBlock)
		   of ([], []) => ()
		    | (rd, wr) => let
			fun f ([], l) = l
			  | f (({dirty = ref true, ...} : io_item)::r, l) = f (r, l)
			  | f ((x as {io_op, fd, kont, id, dirty, ...})::r, l) = let
			      fun look [] = false
				| look (x::r) = if (x = fd)
				    then (
				      enqueue(id, kont);
				      dirty := true;
				      true)
				    else (look r)
			      in
				if (look(case io_op of IO_RD => rd | IO_WR => wr))
				  then f(r, l)
				  else f(r, x::l)
			      end
			in
			  ioWaitList := f(!ioWaitList, [])
			end
		  (* end case *))
	    (* end case *))
	    handle (System.Unsafe.CInterface.SystemCall _) => let
	      open System.Unsafe.SysIO
	      fun testDesc fd = (ftype(DESC fd); false) handle _ => true
	      fun findBadDescs ([], l) = l
		| findBadDescs ((x as {fd, dirty, err_kont, id, ...} : io_item)::r, l) =
		    if (testDesc fd)
		      then (
			enqueue(id, err_kont);
			dirty := true;
			findBadDescs (r, l))
		      else findBadDescs (r, x::l)
	      in
		ioWaitList := findBadDescs(!ioWaitList, []);
		checkIO shouldBlock
	      end

    (* return true if there is at least one clean I/O wait event on the list *)
      fun waitingForIO () = let
	    fun f (l as (({dirty = ref true, ...}::r)) : io_item list) = (f r)
	      | f l = l
	    in
	      case f(!ioWaitList)
	       of [] => (ioWaitList := []; false)
		| l => (ioWaitList := l; true)
	    end


  (** Timer waiting queues **)
    datatype time_wait_t = TIMEWAIT of {
	wait_time : time,
	id : thread_id,
	kont : unit cont,
	dirty : bool ref
      }
    val timeWaitList = ref ([] : time_wait_t list)

  (* insert a timeout event *)
    fun insTimeWait (tim, id, k, flg) = let
	  val item = TIMEWAIT{wait_time=tim, id=id, kont=k, dirty=flg}
	  fun scan [] = [item]
	    | scan ((t as TIMEWAIT{dirty = ref true, ...})::r) = scan r
	    | scan (l as ((t as TIMEWAIT{wait_time, ...})::r)) =
		if (earlier (tim, wait_time))
		  then (item::l)
		  else (t::(scan r))
	  in
	    timeWaitList := scan(!timeWaitList)
	  end

  (* schedule any threads waiting for times earlier than the current time. *)
    fun remTimeWait () = let
	  val tim = currentTime()
	  fun scan [] = []
	    | scan (l as ((t as TIMEWAIT{dirty = ref true, ...})::r)) = scan r
	    | scan (l as ((t as TIMEWAIT{wait_time, id, kont, dirty})::r)) =
		if earlier(tim, wait_time)
		  then l
		  else (enqueue(id, kont); dirty := true; scan r)
	  in
	    timeWaitList := scan(!timeWaitList)
	  end

  (* return true if there is at least one clean timeout event on the list *)
    fun waitingForTimeout () = let
	  fun f (TIMEWAIT{dirty = ref true, ...}::r) = (f r)
	    | f l = l
	  in
	    case (f (!timeWaitList))
	     of [] => (timeWaitList := []; false)
	      | l => (timeWaitList := l; true)
	  end


  (** Process scheduling (and atomic regions) **)

  (*  test for blocked threads that could conceivably become unblocked *)
    fun checkWaitingThreads () = (
	 case (!ioWaitList) of [] => () | _=> (checkIO false);
	 case (!timeWaitList) of [] => () | _=> remTimeWait ())

  (* atomic regions (just SIGALRM for now) *)
    local
      open System.Signals
      datatype atomic_state = NonAtomic | Atomic | SignalPending
      val atomicState = ref NonAtomic
      fun inAtomicRegion () = (case !atomicState of NonAtomic => false | _ => true)
      fun signalPending () = (case !atomicState of SignalPending => true | _ => false)
    in
  (* begin an atomic region *)
    fun atomicBegin () = (atomicState := Atomic)
  (* Switch control to a thread, while leaving the atomic region *)
    fun atomicSwitchToThread (id, kont) = (
	  setCurThread id;
	  atomicState := NonAtomic;
	  throw kont ())
  (* end an atomic region *)
    fun atomicEnd () = if signalPending()
	  then (
	    checkWaitingThreads();
	    callcc (fn k => (atomicSwitchToThread (switchCurThread k))))
	  else atomicState := NonAtomic
  (* dispatch a thread while exiting an atomic region *)
    fun atomicDispatch () = let
	  val _ = if signalPending() then checkWaitingThreads() else ();
	  (* wait for I/O or delay when there are no ready threads. *)
	  fun waitForSomething () = (case (waitingForIO(), waitingForTimeout())
	       of (false, false) => (!shutdown)()
		| (_, false) => (timerOff(); checkIO true; restartTimer())
		| _ => (System.Signals.pause(); checkWaitingThreads()))
	  fun dequeue () = (case rdyQ
	       of (Q{front = ref [], rear = ref []}) => (waitForSomething(); dequeue())
		| (Q{front = front as (ref []), rear = rear as (ref l)}) => let
		    val (x::r) = reverse(l, [])
		    in
		      front := r; rear := []; x
		    end
		| (Q{front = front as (ref(x::r)), ...}) => (front := r; x))
	  in
	    atomicSwitchToThread (dequeue())
	  end
    fun dispatch () = (atomicBegin(); atomicDispatch())
  (* throw to a continuation while exiting an atomic region *)
    fun atomicThrow (k, x)= if signalPending()
	  then (
	    checkWaitingThreads();
	    callcc (fn k1 => (atomicSwitchToThread(switchCurThread k1)));
	    throw k x)
	  else (
	    atomicState := NonAtomic;
	    throw k x)
  (* initialize the atomic region support *)
    fun initAtomic () = let
	  val checkIOKont = callcc (fn k1 => (
		callcc (fn k2 => (throw k1 k2));
		(* NOTE: we start in an atomic region *)
		checkWaitingThreads(); atomicDispatch()))
	  fun alrm_handler (_, k) = if inAtomicRegion()
		then (atomicState := SignalPending;  k)
		else (enqueueCurThread k; atomicBegin(); checkIOKont)
	  in
	    setHandler (SIGALRM, SOME alrm_handler);
	    atomicState := NonAtomic
	  end
    end (* local *)

  (* return the # of threads created and the length of the ready queue. *)
    fun load () = let
	  val _ = atomicBegin()
	  val Q{front, rear} = rdyQ
	  val res = (!nextId, List.length(!front) + List.length(!rear))
	  in
	    atomicEnd();  res
	  end


  (** Condition variables **)
    exception WriteTwice
    fun condVar () = COND(ref(COND_unset[]))
    fun writeVar (COND rc, x) = (
	  atomicBegin();
	  case (! rc)
	   of (COND_unset pl) => let
		fun f [] = ()
		  | f ((_, ref true, _) :: r) = (f r)
		  | f ((id, flg, kont) :: r) = (
		      enqueue (id, callcc (fn k1 => (
			callcc (fn k2 => throw k1 k2);
			throw kont x)));
		      flg := true;
		      f r)
		in
		  rc := (COND_set x);
		  f pl;
		  atomicEnd ()
		end
	    | _ => (atomicEnd(); raise WriteTwice)
	  (* end case *))
    fun addCondWaiter ([], w) = [w]
      | addCondWaiter ((_, ref true, _)::r, w) = addCondWaiter (r, w)
      | addCondWaiter (x::r, w) = x::(addCondWaiter(r, w))
    fun readVar (COND rc) = (
	  atomicBegin();
	  case !rc
	   of (COND_set x) => (atomicEnd(); x)
	    | (COND_unset pl) => callcc (fn k => (
		rc := COND_unset(addCondWaiter(pl, (getTid(), ref false, k)));
		atomicDispatch()))
	  (* end case *))
    fun readVarEvt (COND rc) = mkBaseEvt {
	    pollfn = fn () => (case !rc of (COND_set _) => true | _ => false),
	    dofn = fn abortfn => (
	      case !rc
	       of (COND_set x) => (atomicEnd(); applyAbortFn abortfn; x)
		| _ => error "[readVarEvt.dofn]"),
	    blockfn = fn (dirty, abortfn, next) => let
	      fun block k = (case !rc
		   of (COND_unset pl) => (
			rc := COND_unset(addCondWaiter(pl, (getTid(), dirty, k)));
			next(); error "[readVarEvt]")
		    | _ => error "[readVarEvt.blockfn]")
	      in
		case abortfn
		 of NO_ABORT => (callcc block)
		  | (ABORT f) => ((callcc block) before (f ()))
	      end,
	    abortfn = NO_ABORT
	  }


  (** Channel operations **)
    fun insert (q : 'a chanq, flg, item) = queueIns(q, (flg, item))
    fun remove (q : 'a chanq) = let
	  val (flg, item) = queueRem q
	  in
	    flg := true; item
	  end

  (* Clean a channel of satisfied transactions.	 We do this incrementally to
   * give an amortized constant cost.  Basically we guarantee that the front
   * of the queue will be unsatisfied.	Return true if the resulting queue
   * is non-empty.
   *)
    local
      fun clean' [] = []
	| clean' ((ref true, _) :: r) = clean' r
	| clean' l = l
    in
    fun clean ((Q{front, rear}) : 'a chanq) = (case (front, rear)
	 of (ref [], ref []) => false
	  | (ref [], ref r) => (case clean'(reverse(r, []))
	     of [] => (rear := []; false)
	      | l => (front := l; rear := []; true))
	  | (ref f, ref r) => (case (clean' f)
	     of [] => (case clean'(reverse(r, []))
		 of [] => (front := []; rear := []; false)
		  | l => (front := l; rear := []; true))
	      | l => (front := l; true)))
    fun cleanAndRemove ((Q{front, rear}) : 'a chanq) = (case (front, rear)
	 of (ref [], ref []) => NONE
	  | (ref [], ref r) => (case clean'(reverse(r, []))
	     of [] => (rear := []; NONE)
	      | ((flg, item)::rest) => (
		  front := rest; rear := []; flg := true; SOME item))
	  | (ref f, ref r) => (case (clean' f)
	     of [] => (case clean'(reverse(r, []))
		 of [] => (front := []; rear := []; NONE)
		  | ((flg, item)::rest) => (
		      front := rest; rear := []; flg := true; SOME item))
	      | ((flg, item)::rest) => (front := rest; flg := true; SOME item)))
    end (* local *)

  (* remove any waiting threads from a channel's queues *)
    fun resetChan (CHAN{inq=Q{front=f1, rear=r1}, outq=Q{front=f2, rear=r2}}) = (
	  f1 := []; r1 := []; f2 := []; r2 := [])

  (* channel : unit -> '1a chan *)
    fun channel () = CHAN{inq = queueNew(), outq = queueNew()}

  (* sameChannel : ('a chan * 'a chan) -> bool *)
    fun sameChannel (CHAN{inq=Q{front=f1, ...}, ...}, CHAN{inq=Q{front=f2, ...}, ...}) =
	  (f1 = f2)

  (* send : ('a chan * 'a) -> unit *)
    fun send (CHAN{inq, outq}, msg) = callcc (fn send_k => (
	  atomicBegin();
	  case (cleanAndRemove inq)
	   of SOME(rid, rkont) => (
		enqueueCurThread send_k;
		setCurThread rid;
		atomicThrow (rkont, msg))
	    | NONE => (
		insert(outq, ref false, (getTid(), msg, send_k));
		atomicDispatch())
	  (* end case *)))
    fun sendc ch msg = send (ch, msg)

  (* accept : 'a chan -> 'a *)
    fun accept (CHAN{inq, outq}) = callcc (fn accept_k => (
	  atomicBegin();
	  case (cleanAndRemove outq)
	   of SOME(sid, msg, skont) => (
		enqueue (sid, skont);
		atomicThrow (accept_k, msg))
	    | NONE => (
		insert(inq, ref false, (getTid(), accept_k));
		atomicDispatch())
	  (* end case *)))

  (* transmit : ('a chan * 'a) -> unit event *)
    fun transmit (CHAN{inq, outq}, msg) = let
	  fun pollFn () = (clean inq)
	  fun doFn abortfn = let
		val (rid, rkont) = remove inq
		fun doit k = (
		      enqueueCurThread k;
		      setCurThread rid;
		      atomicThrow(rkont, msg))
		in
		  case abortfn
		   of NO_ABORT => callcc doit
		    | (ABORT f) => (callcc doit; f())
		end
	  fun blockFn (flg, abortfn, next) = let
		fun block k = (
		      clean outq;
		      insert(outq, flg, (getTid(), msg, k));
		      next(); error "[transmit]")
		in
		  case abortfn
		   of NO_ABORT => (callcc block)
		    | (ABORT f) => (callcc block; f())
		end
	  in
	    mkBaseEvt {
		pollfn = pollFn, dofn = doFn, blockfn = blockFn, abortfn = NO_ABORT
	      }
	  end
    fun transmitc ch msg = transmit (ch, msg)

  (* receive : 'a chan -> 'a event *)
    fun receive (CHAN{inq, outq}) = let
	  fun pollFn () = (clean outq)
	  fun doFn abortfn = let
		val (sid, msg, skont) = remove outq
		in
		  enqueue (sid, skont);
		  atomicEnd ();
		  applyAbortFn abortfn;
		  msg
		end
	  fun blockFn (flg, abortfn, next) = let
		fun block k = (
		      clean inq;
		      insert(inq, flg, (getTid(), k));
		      next(); error "[receive]")
		in
		  case abortfn
		   of NO_ABORT => (callcc block)
		    | (ABORT f) => ((callcc block) before (f ()))
		end
	  in
	    mkBaseEvt {
		pollfn = pollFn, dofn = doFn, blockfn = blockFn, abortfn = NO_ABORT
	      }
	  end

  (* A channel to pass errors on to an error monitor *)
    val errCh : (thread_id * exn) chan = channel()


  (** Thread operations **)
    fun notify () = let
	  val (TID{id, death_cond, ...}) = getTid()
	  in
	    writeVar (death_cond, ())
	  end
    fun spawnc f x = let
	  val _ = atomicBegin()
	  val id = newId()
	  in
	    callcc (fn parent_k => (
	      enqueueCurThread parent_k;
	      setCurThread id;
	      atomicEnd();
	      (f x; notify())
		handle ex => (notify (); send (errCh, ((getTid()), ex)));
	      dispatch ()));
	    id
	  end
    fun spawn f = spawnc f ()

    fun fastSpawn f = let
	  val _ = atomicBegin()
	  val id = newId()
	  in
	    callcc (fn k => (
	      callcc (fn k' => (enqueue (id, k'); atomicThrow (k, ())));
	      (f (); notify())
		handle ex => (notify (); send (errCh, (getTid(), ex)));
	      dispatch ()))
	  end
    val spawnList = app fastSpawn

  (* terminate the current thread *)
    fun exit () = (notify(); dispatch())

    fun threadWait (TID{death_cond, ...}) = readVarEvt death_cond

  (* test thread_ids for equality *)
    fun sameThread (TID{id=p1, ...}, TID{id=p2, ...}) = (p1 = p2)
  (* test the order of thread ids *)
    fun tidLessThan (TID{id=p1, ...}, TID{id=p2, ...}) = (p1 < p2)
  (* return a string representation of a thread id *)
    fun tidToString (TID{id, ...}) = concat ["[", makestring id, "]"]

  (* yield control to the next thread *)
    fun yield () = callcc (fn k => (
	  atomicBegin(); enqueueCurThread k; atomicDispatch()))


  (** Event operations **)
    fun dummyFn _ = error "[dummyFn]"

  (* always : 'a -> 'a event *)
    fun always x = mkBaseEvt {
	    pollfn = (fn () => true),
	    dofn = (fn abort => (applyAbortFn abort; x)),
	    blockfn = dummyFn,
	    abortfn = NO_ABORT
	  }
    val ALWAYS = always ()

  (* 'a event list -> 'a event *)
    fun choose l = let
	  fun f ([], el, [], flg) = EVT(el, flg)
	  (* note that the guard list gl is in reverse order *)
	    | f ([], el, gl, flg) =
		GUARD(fn () =>
		  choose (EVT(el, flg) :: (revmap (fn g => (g ())) gl)))
	    | f (EVT(el', false) :: r, el, gl, hasAbort) =
		f (r, el' @ el, gl, hasAbort)
	    | f (EVT(el', true) :: r, el, gl, _) = f (r, el' @ el, gl, true)
	    | f ((GUARD g)::r, el, gl, hasAbort) = f (r, el, g::gl, hasAbort)
	  in
	    f (l, [], [], false)
	  end

  (* guard : (unit -> 'a event) -> 'a event *)
    val guard = GUARD

  (* wrap : ('a event * ('a -> 'b)) -> 'b event *)
    fun wrap (GUARD g, f) = GUARD(fn () => wrap(g (), f))
      | wrap (EVT(el, flg), f) = let
	  fun wrapEvts ([], l) = l
	    | wrapEvts ((BASE_EVT{pollfn, dofn, blockfn, abortfn})::r, l) = let
		val bev = BASE_EVT{
			pollfn = pollfn,
			dofn = (f o dofn),
			blockfn = (f o blockfn),
			abortfn = abortfn
		      }
		in
		  wrapEvts(r, bev::l)
		end
	  in
	    EVT(wrapEvts (el, []), flg)
	  end

  (* wrapHandler : ('a event * (exn -> 'a)) -> 'a event *)
    fun wrapHandler (GUARD g, h) =GUARD(fn () => wrapHandler(g (), h))
      | wrapHandler (EVT(el, flg), h) = let
	  fun wh f x = (f x) handle e => (h e)
	  fun wrapEvts ([], l) = l
	    | wrapEvts ((BASE_EVT{pollfn, dofn, blockfn, abortfn})::r, l) = let
		val bev = BASE_EVT{
			pollfn = pollfn,
			dofn = (wh dofn),
			blockfn = (wh blockfn),
			abortfn = abortfn
		      }
		in
		  wrapEvts(r, bev::l)
		end
	  in
	    EVT(wrapEvts (el, []), flg)
	  end

  (* wrapAbort : (a event * (unit -> unit)) -> 'a event *)
    fun wrapAbort (GUARD g, abort) = GUARD(fn () => wrapAbort (g (), abort))
      | wrapAbort (EVT(el, flg), abort) = let
	  fun addAbortFn (BASE_EVT{pollfn, dofn, blockfn, abortfn}, abort) = BASE_EVT{
		  pollfn = pollfn,
		  dofn = dofn,
		  blockfn = blockfn,
		  abortfn = (case abortfn
		     of NO_ABORT => ABORT abort
		      | (ABORT a) => ABORT(fn () => (fastSpawn abort; a())))
		}
	  in
	    case el
	     of [] => EVT([], false)
	      | [bev] => EVT([addAbortFn (bev, abort)], true)
	      | (leader :: followers) => let
		  val n = length followers
		  in
		    GUARD (fn () => let
		      val ackCh = channel()
		      fun followerAbort () = send(ackCh, ())
		      fun leaderAbort 0 = abort()
		        | leaderAbort i = (accept ackCh; leaderAbort(i-1))
		      in
			EVT(
			  addAbortFn(leader, fn () => (leaderAbort n)) ::
			    (map (fn b => addAbortFn(b, followerAbort)) followers),
			  true)
		      end)
		  end
	  end

  (** Sync and poll **)
    local
    (* Generate index numbers for "non-deterministic" selection.  We use a
     * round-robin style policy. *)
      val cnt = ref 0
      fun random 1 = 0
	| random i = let val j = !cnt
	    in
	      cnt := Bits.andb(j+1, 0x7fff); (j rem i)
	    end
      fun selectDoFn (el, n) = let
	    fun sel (f::_, 0) = f NO_ABORT
	      | sel (_::r, n) = sel (r, n-1)
	    in
	      sel (el, random n)
	    end

    fun syncOnEvts ([], _) = exit()
      | syncOnEvts ([BASE_EVT{pollfn, dofn, blockfn, ...}], _) = (
	    atomicBegin();
	    if (pollfn ())
	      then dofn NO_ABORT
	      else blockfn (ref false, NO_ABORT, atomicDispatch))
      | syncOnEvts (bevs, false) = let
	  fun ext ([], blockFns) = capture (fn k => let
		val escape = escape k
		val dirtyFlg = ref false
		fun log [] = atomicDispatch ()
		  | log (blockfn :: r) = escape (
		      blockfn (dirtyFlg, NO_ABORT, fn () => log r))
		in
		  log blockFns; error "[log]"
		end)
	    | ext (BASE_EVT{pollfn, dofn, blockfn, ...} :: r, blockFns) =
		if (pollfn ())
		  then extRdy (r, [dofn], 1)
		  else ext (r, blockfn::blockFns)
	  and extRdy ([], doFns, n) = selectDoFn (doFns, n)
	    | extRdy (BASE_EVT{pollfn, dofn, ...} :: r, doFns, n) =
		if (pollfn ())
		  then extRdy (r, dofn::doFns, n+1)
		  else extRdy (r, doFns, n)
	  in
	    atomicBegin();
	    ext (bevs, [])
	  end
      | syncOnEvts (bevs, true) = let
	  datatype 'a bevt_status
	    = BLK of 'a block_fn_t
	    | BLK_ABORT of ('a block_fn_t * (unit -> unit))
	    | RDY of (abort_fn -> 'a)
	    | RDY_ABORT of ((abort_fn -> 'a) * (unit -> unit))
	  withtype 'a block_fn_t = (bool ref * abort_fn * (unit -> unit)) -> 'a
	  fun ext ([], sts) = let
		fun projAbortFns [] = []
		  | projAbortFns (BLK_ABORT(_, abort)::r) = abort :: (projAbortFns r)
		  | projAbortFns (_::r) = projAbortFns r
		val abortFns = projAbortFns sts
		val allAborts = ABORT(fn () => spawnList abortFns)
		fun mkAbortFn i = let
		      fun abort ([], _) = ()
			| abort (a::r, j) = (
			    if (i <> j) then (fastSpawn a; ()) else ();
			    abort (r, j+1))
		      in
			ABORT(fn () => abort(abortFns, 0))
		      end
		in
		  capture (fn k => let
		    val escape = escape k
		    val dirtyFlg = ref false
		    fun log ([], _) = atomicDispatch ()
		      | log ((BLK bfn) :: r, i) = escape (
			  bfn (dirtyFlg, allAborts,
			    fn () => (log(r, i); error "[log]")))
		      | log ((BLK_ABORT(bfn, _)) :: r, i) = escape (
			  bfn (dirtyFlg, mkAbortFn i,
			    fn () => (log(r, i+1); error "[log]")))
		      | log _ = error "[log]"
		    in
		      log (sts, 0)
		    end)
		end
	    | ext (BASE_EVT{pollfn, dofn, blockfn, abortfn} :: r, sts) = (
		case (pollfn(), abortfn)
		 of (false, NO_ABORT) => ext (r, (BLK blockfn)::sts)
		  | (false, ABORT a) => ext (r, (BLK_ABORT(blockfn, a))::sts)
		  | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, 1)
		  | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, 1)
		(* end case *))
	  and extRdy ([], sts, nRdy) = let
		fun selAndAbortRest ([], _, _) = error "[selAndAbortRest]"
		  | selAndAbortRest ((BLK _)::r, i, abortFns) =
		      selAndAbortRest (r, i, abortFns)
		  | selAndAbortRest ((BLK_ABORT(_, abort))::r, i, abortFns) =
		      selAndAbortRest (r, i, abort::abortFns)
		  | selAndAbortRest ((RDY doFn)::r, 0, abortFns) =
		      abortRest (r, abortFns, doFn)
		  | selAndAbortRest ((RDY _)::r, i, abortFns) =
		      selAndAbortRest (r, i-1, abortFns)
		  | selAndAbortRest ((RDY_ABORT(doFn, _))::r, 0, abortFns) =
		      abortRest (r, abortFns, doFn)
		  | selAndAbortRest ((RDY_ABORT(_, abort))::r, i, abortFns) =
		      selAndAbortRest (r, i-1, abort::abortFns)
		and abortRest ([], abortFns, doFn) =
		      doFn (ABORT(fn () => spawnList abortFns))
		  | abortRest ((BLK_ABORT(_, abort))::r, abortFns, doFn) =
		      abortRest (r, abort::abortFns, doFn)
		  | abortRest ((RDY_ABORT(_, abort))::r, abortFns, doFn) =
		      abortRest (r, abort::abortFns, doFn)
		  | abortRest (_::r, abortFns, doFn) =
		      abortRest (r, abortFns, doFn)
		in
		  selAndAbortRest (sts, random nRdy, [])
		end
	    | extRdy (BASE_EVT{pollfn, dofn, blockfn, abortfn} :: r, sts, nRdy) = (
		case (pollfn(), abortfn)
		 of (false, NO_ABORT) => extRdy (r, sts, nRdy)
		  | (false, ABORT a) => extRdy (r, (BLK_ABORT(blockfn, a))::sts, nRdy)
		  | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, nRdy+1)
		  | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, nRdy+1)
		(* end case *))
	  in
	    atomicBegin();
	    ext (bevs, [])
	  end

    fun pollEvts ([], _) = NONE
      | pollEvts ([BASE_EVT{pollfn, dofn, ...}], _) = (
	    atomicBegin();
	    if (pollfn ()) then SOME(dofn NO_ABORT) else (atomicEnd(); NONE))
      | pollEvts (bevs, false) = let
	  fun ext [] = (atomicEnd(); NONE)
	    | ext (BASE_EVT{pollfn, dofn, ...} :: r) =
		if (pollfn ()) then extRdy(r, [dofn], 1) else ext r
	  and extRdy ([], doFns, n) = SOME(selectDoFn (doFns, n))
	    | extRdy (BASE_EVT{pollfn, dofn, ...} :: r, doFns, n) =
		if (pollfn ())
		  then extRdy (r, dofn::doFns, n+1)
		  else extRdy (r, doFns, n)
	  in
	    atomicBegin();
	    ext bevs
	  end
      | pollEvts (bevs, true) = let
	  datatype 'a bevt_status
	    = BLK_ABORT of (unit -> unit)
	    | RDY of (abort_fn -> 'a)
	    | RDY_ABORT of ((abort_fn -> 'a) * (unit -> unit))
	  fun doAndAbort (sts, i) = let
		fun f ([], abortFns, _, NONE) = (spawnList abortFns; NONE)
		  | f ([], abortFns, _, SOME doFn) =
			SOME (doFn (ABORT(fn () => spawnList abortFns)))
		  | f ((BLK_ABORT abort) :: r, abortFns, i, doFn) =
			f (r, abort::abortFns, i, doFn)
		  | f ((RDY doFn) :: r, abortFns, 0, _) =
			f (r, abortFns, i, SOME doFn)
		  | f ((RDY _) :: r, abortFns, i, doFn) =
			f (r, abortFns, i-1, doFn)
		  | f ((RDY_ABORT(doFn, _)) :: r, abortFns, 0, _) =
			f (r, abortFns, i, SOME doFn)
		  | f ((RDY_ABORT(_, abort)) :: r, abortFns, i, doFn) =
			f (r, abort::abortFns, i, doFn)
		in
		  f (sts, [], i, NONE)
		end
	  fun ext ([], sts) = doAndAbort (sts, ~1)
	    | ext (BASE_EVT{pollfn, dofn, abortfn, ...} :: r, sts) = (
		case (pollfn (), abortfn)
		 of (false, NO_ABORT) => ext (r, sts)
		  | (false, ABORT a) => ext (r, (BLK_ABORT a)::sts)
		  | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, 1)
		  | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, 1)
		(* end case *))
	  and extRdy ([], sts, n) = doAndAbort (sts, random n)
	    | extRdy (BASE_EVT{pollfn, dofn, abortfn, ...} :: r, sts, n) = (
		case (pollfn (), abortfn)
		 of (false, NO_ABORT) => extRdy (r, sts, n)
		  | (false, ABORT a) => extRdy (r, (BLK_ABORT a)::sts, n)
		  | (true, NO_ABORT) => extRdy (r, (RDY dofn)::sts, n+1)
		  | (true, ABORT a) => extRdy (r, (RDY_ABORT(dofn, a))::sts, n+1)
		(* end case *))
	  in
	    atomicBegin();
	    ext (bevs, [])
	  end

    fun evalGuard (GUARD g) = evalGuard (g ())
      | evalGuard (EVT arg) = arg
    in
    fun sync evt = syncOnEvts (evalGuard evt)
    val select = sync o choose
    fun poll evt = pollEvts (evalGuard evt)
    end (* local *)

  (* waitUntil : time -> unit event *)
    fun waitUntil (t as TIME{sec, usec}) = let
	  val curTime = currentTime()
	  in
	    if earlier(curTime, t)
	      then let
	        fun pollFn () = false
		fun blockFn (flg, abort, next) = let
		      fun block k = (
			    insTimeWait(t, getTid(), k, flg);
			    next())
		      in
			case abort
			 of NO_ABORT => (callcc block)
			  | (ABORT f) => (callcc block; f ())
		      end
		in
		  mkBaseEvt {
		      pollfn = pollFn, dofn = dummyFn, blockfn = blockFn,
		      abortfn = NO_ABORT
		    }
		end
	      else ALWAYS
	  end

  (* timeout : time -> unit event *)
    fun timeout (t as TIME{sec, usec}) = if earlier(t, zeroTime)
	  then ALWAYS
	  else let
	    fun pollFn () = false
	    fun blockFn (flg, abort, next) = let
		  fun block k = (
			insTimeWait(add_time (currentTime(), t), getTid(), k, flg);
			next())
		  in
		    case abort
		     of NO_ABORT => (callcc block)
		      | (ABORT f) => (callcc block; f ())
		  end
	    in
	      mkBaseEvt {
		  pollfn = pollFn, dofn = dummyFn, blockfn = blockFn,
		  abortfn = NO_ABORT
		}
	    end


  (** Initialization **)

  (* initialize the internal queues, I/O waiting lists and counters *)
    fun initQueues () = let val Q{front, rear} = rdyQ
	  in
	    front := []; rear := [];
	    ioWaitList := [];
	    timeWaitList := [];
	    nextId := 0;
	    setCurThread (newId());
	    initAtomic()
	  end

  end (* functor ConcurML *)

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0