Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/core-cml/sync-var.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/core-cml/sync-var.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1924 - (view) (download)

1 : monnier 2 (* sync-var.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     * COPYRIGHT (c) 1989-1991 John H. Reppy
5 :     *
6 :     * The implementation of Id-style synchronizing memory cells.
7 :     *)
8 :    
9 :     structure SyncVar :> SYNC_VAR =
10 :     struct
11 :    
12 :     structure R = RepTypes
13 :     structure S = Scheduler
14 :    
15 :     type 'a cont = 'a SMLofNJ.Cont.cont
16 :     val callcc = SMLofNJ.Cont.callcc
17 :     val throw = SMLofNJ.Cont.throw
18 :    
19 :     (* the underlying representation of both ivars and mvars is the same. *)
20 :     datatype 'a cell = CELL of {
21 :     priority : int ref,
22 :     readQ : (R.trans_id ref * 'a cont) Q.queue,
23 :     value : 'a option ref
24 :     }
25 :    
26 :     type 'a ivar = 'a cell
27 :     type 'a mvar = 'a cell
28 :    
29 :     exception Put
30 :    
31 :     fun newCell () = CELL{priority = ref 0, readQ = Q.queue(), value=ref NONE}
32 :     fun sameCell (CELL{value=v1, ...}, CELL{value=v2, ...}) = (v1 = v2)
33 :    
34 :     (* create a new transaction ID *)
35 :     fun mkId () = ref(R.TRANS(S.getCurThread()))
36 :    
37 :     (* given a transaction ID, get its thread ID and mark it cancelled. *)
38 :     fun getIdFromTrans (transId as ref(R.TRANS tid)) = (
39 :     transId := R.CANCEL;
40 :     tid)
41 :    
42 :     (* bump a priority value by one, returning the old value *)
43 :     fun bumpPriority (p as ref n) = (p := n+1; n)
44 :    
45 :     datatype 'a q_item
46 :     = NoItem
47 :     | Item of (R.trans_id ref * 'a cont)
48 :    
49 :     (* functions to clean channel input and output queues *)
50 :     local
51 :     fun clean [] = []
52 :     | clean ((ref R.CANCEL, _)::r) = clean r
53 :     | clean l = l
54 :     fun cleanRev ([], l) = l
55 :     | cleanRev ((ref R.CANCEL, _)::r, l) = cleanRev (r, l)
56 :     | cleanRev (x::r, l) = cleanRev (r, x::l)
57 :     in
58 :     fun cleanAndChk (priority, R.Q{front, rear}) = let
59 :     fun cleanFront [] = cleanRear (! rear)
60 :     | cleanFront f = (case (clean f)
61 :     of [] => cleanRear (! rear)
62 :     | f' => (front := f'; bumpPriority priority)
63 :     (* end case *))
64 :     and cleanRear [] = 0
65 :     | cleanRear r = (
66 :     rear := [];
67 :     case (cleanRev (r, []))
68 :     of [] => 0
69 :     | rr => (front := rr; bumpPriority priority)
70 :     (* end case *))
71 :     in
72 :     cleanFront (! front)
73 :     end
74 :     fun cleanAndRemove (R.Q{front, rear, ...}) = let
75 :     fun cleanFront [] = cleanRear (! rear)
76 :     | cleanFront f = (case (clean f)
77 :     of [] => cleanRear (! rear)
78 :     | (item::rest) => (front := rest; Item item)
79 :     (* end case *))
80 :     and cleanRear [] = NoItem
81 :     | cleanRear r = (
82 :     rear := [];
83 :     case (cleanRev (r, []))
84 :     of [] => NoItem
85 :     | (item::rest) => (front := rest; Item item)
86 :     (* end case *))
87 :     in
88 :     cleanFront (! front)
89 :     end
90 :     fun cleanAndEnqueue (R.Q{front, rear, ...}, item) = let
91 :     fun cleanFront [] = cleanRear (! rear)
92 :     | cleanFront f = (case (clean f)
93 :     of [] => cleanRear (! rear)
94 :     | f' => (front := f'; rear := item :: (! rear))
95 :     (* end case *))
96 :     and cleanRear [] = (front := [item])
97 :     | cleanRear r = (case (cleanRev (r, []))
98 :     of [] => (front := [item]; rear := [])
99 :     | rr => (rear := [item]; front := rr)
100 :     (* end case *))
101 :     in
102 :     cleanFront (! front)
103 :     end
104 :     end (* local *)
105 :    
106 :     (* When a thread is resumed after being blocked on an iGet or mGet operation,
107 :     * there may be other threads also blocked on the variable. This function
108 :     * is used to propagate the message to all of the threads that are blocked
109 :     * on the variable (or until one of them takes the value in the mvar case).
110 :     * It must be called from an atomic region; when the readQ is finally empty,
111 :     * we leave the atomic region. We must use "cleanAndRemove" to get items
112 :     * from the readQ in the unlikely event that a single thread executes a
113 :     * choice of multiple gets on the same variable.
114 :     *)
115 :     fun relayMsg (readQ, msg) = (case (cleanAndRemove readQ)
116 :     of NoItem => S.atomicEnd()
117 :     | (Item(transId, kont)) => callcc (fn myKont => (
118 :     S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId);
119 :     throw kont msg))
120 :     (* end case *))
121 :    
122 :     fun impossible () = raise Fail "SyncVar: impossible"
123 :    
124 :    
125 :     (** I-variables **)
126 :    
127 :     val iVar = newCell
128 :     val sameIVar = sameCell
129 :    
130 :     fun iPut (CELL{priority, readQ, value}, x) = (
131 :     S.atomicBegin();
132 :     case !value
133 :     of NONE => (
134 :     value := SOME x;
135 :     case (cleanAndRemove readQ)
136 :     of NoItem => S.atomicEnd()
137 :     | (Item(transId, kont)) => callcc (fn myKont => (
138 :     S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId);
139 :     priority := 1;
140 :     throw kont x))
141 :     (* end case *))
142 :     | (SOME _) => (S.atomicEnd(); raise Put)
143 :     (* end case *))
144 :    
145 :     fun iGet (CELL{priority, readQ, value}) = (
146 :     S.atomicBegin();
147 :     case !value
148 :     of NONE => let
149 :     val msg = callcc (fn k => (
150 :     Q.enqueue (readQ, (mkId(), k));
151 :     S.atomicDispatch ()))
152 :     in
153 :     relayMsg (readQ, msg); msg
154 :     end
155 :     | (SOME v) => (S.atomicEnd(); v)
156 :     (* end case *))
157 :    
158 :     fun iGetEvt (CELL{priority, readQ, value}) = let
159 :     fun blockFn {transId, cleanUp, next} = let
160 :     val msg = callcc (fn k => (
161 :     Q.enqueue (readQ, (transId, k));
162 :     next ();
163 :     impossible()))
164 :     in
165 :     cleanUp();
166 :     relayMsg (readQ, msg); msg
167 :     end
168 :     fun pollFn () = (case !value
169 :     of NONE => R.BLOCKED blockFn
170 :     | (SOME v) => R.ENABLED{
171 :     prio=bumpPriority priority,
172 :     doFn=(fn () => (priority := 1; S.atomicEnd(); v))
173 :     }
174 :     (* end case *))
175 :     in
176 :     R.BEVT[pollFn]
177 :     end
178 :    
179 : jhr 1924 fun iGetPoll (CELL{priority, readQ, value}) = let
180 :     val res = (
181 :     S.atomicBegin();
182 :     case !value
183 :     of NONE => NONE
184 :     | (SOME v) => SOME v
185 :     (* end case *))
186 :     in
187 :     S.atomicEnd();
188 :     res
189 :     end
190 : monnier 2
191 :    
192 :     (** M-variables **)
193 :    
194 :     val mVar = newCell
195 :     fun mVarInit x = CELL{priority = ref 0, readQ = Q.queue(), value=ref(SOME x)}
196 :     val sameMVar = sameCell
197 :    
198 :     fun mPut (CELL{priority, readQ, value}, x) = (
199 :     S.atomicBegin();
200 :     case !value
201 :     of NONE => (
202 :     value := SOME x;
203 :     case (cleanAndRemove readQ)
204 :     of NoItem => S.atomicEnd()
205 :     | (Item(transId, kont)) => callcc (fn myKont => (
206 :     S.enqueueAndSwitchCurThread(myKont, getIdFromTrans transId);
207 :     priority := 1;
208 :     throw kont x))
209 :     (* end case *))
210 :     | (SOME _) => (S.atomicEnd(); raise Put)
211 :     (* end case *))
212 :    
213 :     fun mTake (CELL{priority, readQ, value}) = (
214 :     S.atomicBegin();
215 :     case !value
216 :     of NONE => let
217 :     val v = callcc (fn k => (
218 :     Q.enqueue (readQ, (mkId(), k));
219 :     S.atomicDispatch ()))
220 :     in
221 :     value := NONE;
222 :     S.atomicEnd();
223 :     v
224 :     end
225 :     | (SOME v) => (value := NONE; S.atomicEnd(); v)
226 :     (* end case *))
227 :    
228 :     fun mTakeEvt (CELL{priority, readQ, value}) = let
229 :     fun blockFn {transId, cleanUp, next} = let
230 :     val v = callcc (fn k => (
231 :     Q.enqueue (readQ, (transId, k));
232 :     next ();
233 :     impossible()))
234 :     in
235 :     cleanUp();
236 :     value := NONE;
237 :     S.atomicEnd();
238 :     v
239 :     end
240 :     fun pollFn () = (case !value
241 :     of NONE => R.BLOCKED blockFn
242 :     | (SOME v) => R.ENABLED{
243 :     prio=bumpPriority priority,
244 :     doFn=(fn () => (value := NONE; S.atomicEnd(); v))
245 :     }
246 :     (* end case *))
247 :     in
248 :     R.BEVT[pollFn]
249 :     end
250 :    
251 : jhr 1924 fun mTakePoll (CELL{priority, readQ, value}) = let
252 :     val res = (
253 :     S.atomicBegin();
254 :     case !value
255 :     of NONE => NONE
256 :     | (SOME v) => (value := NONE; SOME v)
257 :     (* end case *))
258 :     in
259 :     S.atomicEnd(); res
260 :     end
261 : monnier 2
262 :     fun mGet (CELL{priority, readQ, value}) = (
263 :     S.atomicBegin();
264 :     case !value
265 :     of NONE => let
266 :     val v = callcc (fn k => (
267 :     Q.enqueue (readQ, (mkId(), k));
268 :     S.atomicDispatch ()))
269 :     in
270 :     relayMsg (readQ, v); v
271 :     end
272 :     | (SOME v) => (S.atomicEnd(); v)
273 :     (* end case *))
274 :    
275 :     fun mGetEvt (CELL{priority, readQ, value}) = let
276 :     fun blockFn {transId, cleanUp, next} = let
277 :     val v = callcc (fn k => (
278 :     Q.enqueue (readQ, (transId, k));
279 :     next ();
280 :     impossible()))
281 :     in
282 :     cleanUp();
283 :     relayMsg (readQ, v);
284 :     v
285 :     end
286 :     fun pollFn () = (case !value
287 :     of NONE => R.BLOCKED blockFn
288 :     | (SOME v) => R.ENABLED{
289 :     prio=bumpPriority priority,
290 :     doFn=(fn () => (S.atomicEnd(); v))
291 :     }
292 :     (* end case *))
293 :     in
294 :     R.BEVT[pollFn]
295 :     end
296 :    
297 :     fun mGetPoll (CELL{priority, readQ, value}) = (
298 :     S.atomicBegin();
299 :     case !value
300 :     of NONE => NONE
301 :     | (SOME v) => (S.atomicEnd(); SOME v)
302 :     (* end case *))
303 :    
304 :     (* Swap the current contents of the cell with a new value. This function
305 :     * has the effect of an mTake followed by an mPut, except that it is
306 :     * guaranteed to be atomic. It is also somewhat more efficient.
307 :     *)
308 :     fun mSwap (CELL{priority, readQ, value}, newV) = (
309 :     S.atomicBegin();
310 :     case !value
311 :     of NONE => let
312 :     val v = callcc (fn k => (
313 :     Q.enqueue (readQ, (mkId(), k));
314 :     S.atomicDispatch ()))
315 :     in
316 :     value := SOME newV;
317 :     (* relay the new value to any other blocked threads *)
318 :     relayMsg (readQ, newV);
319 :     v
320 :     end
321 :     | (SOME v) => (value := SOME newV; S.atomicEnd(); v)
322 :     (* end case *))
323 :    
324 :     fun mSwapEvt (CELL{priority, readQ, value}, newV) = let
325 :     fun blockFn {transId, cleanUp, next} = let
326 :     val v = callcc (fn k => (
327 :     Q.enqueue (readQ, (transId, k));
328 :     next ();
329 :     impossible()))
330 :     in
331 :     cleanUp();
332 :     value := SOME newV;
333 :     relayMsg (readQ, newV);
334 :     v
335 :     end
336 :     fun pollFn () = (case !value
337 :     of NONE => R.BLOCKED blockFn
338 :     | (SOME v) => R.ENABLED{
339 :     prio=bumpPriority priority,
340 :     doFn=(fn () => (value := SOME newV; S.atomicEnd(); v))
341 :     }
342 :     (* end case *))
343 :     in
344 :     R.BEVT[pollFn]
345 :     end
346 :    
347 :     end; (* SyncVar *)

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0