Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/IO/new-text-io-fn.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/IO/new-text-io-fn.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1323 - (view) (download)

1 : blume 844 (* text-io-fn.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     *
5 :     * This is the CML version of the TextIO functor.
6 :     *)
7 :    
8 :     functor TextIOFn (
9 :    
10 :     structure OSPrimIO : sig
11 :     include OS_PRIM_IO
12 :     val stdIn : unit -> PrimIO.reader
13 :     val stdOut : unit -> PrimIO.writer
14 :     val stdErr : unit -> PrimIO.writer
15 :     val strReader : string -> PrimIO.reader
16 :     end
17 :     where type PrimIO.array = TextPrimIO.array
18 :     where type PrimIO.vector = TextPrimIO.vector
19 :     where type PrimIO.elem = TextPrimIO.elem
20 :     where type PrimIO.pos = TextPrimIO.pos
21 :     where type PrimIO.reader = TextPrimIO.reader
22 :     where type PrimIO.writer = TextPrimIO.writer
23 :    
24 :     ) : CML_TEXT_IO = struct
25 :    
26 :     structure PIO = OSPrimIO.PrimIO
27 :     structure A = CharArray
28 :     structure V = CharVector
29 :    
30 :     structure SV = SyncVar
31 :    
32 :     (* assign to an MVar *)
33 :     fun mUpdate (mv, x) = ignore(SV.mSwap(mv, x))
34 :    
35 :     (* an element for initializing buffers *)
36 :     val someElem = #"\000"
37 :    
38 :     val vecExtract = V.extract
39 :     val vecSub = V.sub
40 :     val arrUpdate = A.update
41 :     val substringBase = Substring.base
42 :     val empty = ""
43 :    
44 :     fun dummyCleaner () = ()
45 :    
46 :     structure StreamIO =
47 :     struct
48 :     type vector = V.vector
49 :     type elem = V.elem
50 :     type reader = PIO.reader
51 :     type writer = PIO.writer
52 :     type pos = PIO.pos
53 :    
54 :     (*** Functional input streams ***)
55 :     datatype instream = ISTRM of (in_buffer * int)
56 :     and in_buffer = IBUF of {
57 :     basePos : pos option,
58 :     more : more SV.mvar, (* when this cell is empty, it means that *)
59 :     (* there is an outstanding request to the *)
60 :     (* server to extend the stream. *)
61 :     data : vector,
62 :     info : info
63 :     }
64 :     and more
65 :     = MORE of in_buffer (* forward link to additional data *)
66 :     | NOMORE (* placeholder for forward link *)
67 :     | TERMINATED (* termination of the stream *)
68 :    
69 :     and info = INFO of {
70 :     reader : reader,
71 :     readVec : int -> vector,
72 :     readVecEvt : int -> vector CML.event,
73 :     closed : bool ref,
74 :     getPos : unit -> pos option,
75 :     tail : more SV.mvar SV.mvar,
76 :     (* points to the more cell of the last buffer *)
77 :     cleanTag : CleanIO.tag
78 :     }
79 :    
80 :     fun infoOfIBuf (IBUF{info, ...}) = info
81 :     fun chunkSzOfIBuf buf = let
82 :     val INFO{reader=PIO.RD{chunkSize, ...}, ...} = infoOfIBuf buf
83 :     in
84 :     chunkSize
85 :     end
86 :     fun readVec (IBUF{info=INFO{readVec=f, ...}, ...}) = f
87 :    
88 :     fun inputExn (INFO{reader=PIO.RD{name, ...}, ...}, mlOp, exn) =
89 :     raise IO.Io{function=mlOp, name=name, cause=exn}
90 :    
91 :     datatype more_data = EOF | DATA of in_buffer
92 :    
93 :     (* extend the stream by a chunk.
94 :     * Invariant: the more m-variable is empty on entry and full on exit.
95 :     *)
96 :     fun extendStream (readFn, mlOp, buf as IBUF{more, info, ...}) = (let
97 :     val INFO{getPos, tail, ...} = info
98 :     val basePos = getPos()
99 :     val chunk = readFn (chunkSzOfIBuf buf)
100 :     in
101 :     if (V.length chunk = 0)
102 :     then (SV.mPut (more, NOMORE); EOF)
103 :     else let
104 :     val newMore = SV.mVar()
105 :     val buf' = IBUF{
106 :     basePos = basePos, data = chunk,
107 :     more = newMore, info = info
108 :     }
109 :     in
110 :     (* note that we do not fill the more cell until
111 :     * after the tail has been updated. This ensures
112 :     * that someone attempting to access the tail will
113 :     * not acquire the lock until after we are done.
114 :     *)
115 :     mUpdate (tail, newMore);
116 :     SV.mPut (more, MORE buf'); (* releases lock!! *)
117 :     SV.mPut (newMore, NOMORE);
118 :     DATA buf'
119 :     end
120 :     end
121 :     handle ex => (
122 :     SV.mPut (more, NOMORE);
123 :     inputExn(info, mlOp, ex)))
124 :    
125 :     (* get the next buffer in the stream, extending it if necessary. If
126 :     * the stream must be extended, we lock it by taking the value from the
127 :     * more cell; the extendStream function is responsible for filling in
128 :     * the cell.
129 :     *)
130 :     fun getBuffer (readFn, mlOp) (buf as IBUF{more, info, ...}) = let
131 :     fun get TERMINATED = EOF
132 :     | get (MORE buf') = DATA buf'
133 :     | get NOMORE = (case SV.mTake more
134 :     of NOMORE => extendStream (readFn, mlOp, buf)
135 :     | next => (SV.mPut(more, next); get next)
136 :     (* end case *))
137 :     in
138 :     get (SV.mGet more)
139 :     end
140 :    
141 :     (* read a chunk that is at least the specified size *)
142 :     fun readChunk buf = let
143 :     val INFO{readVec, reader=PIO.RD{chunkSize, ...}, ...} =
144 :     infoOfIBuf buf
145 :     in
146 :     case (chunkSize - 1)
147 :     of 0 => (fn n => readVec n)
148 :     | k => (* round up to next multiple of chunkSize *)
149 :     (fn n => readVec(Int.quot(n+k, chunkSize) * chunkSize))
150 :     (* end case *)
151 :     end
152 :    
153 :     fun generalizedInput getBuf = let
154 :     fun get (ISTRM(buf as IBUF{data, ...}, pos)) = let
155 :     val len = V.length data
156 :     in
157 :     if (pos < len)
158 :     then (vecExtract(data, pos, NONE), ISTRM(buf, len))
159 :     else (case (getBuf buf)
160 :     of EOF => (empty, ISTRM(buf, len))
161 :     | (DATA rest) => get (ISTRM(rest, 0))
162 :     (* end case *))
163 :     end
164 :     in
165 :     get
166 :     end
167 :    
168 :     (* terminate an input stream *)
169 :     fun terminate (info as INFO{tail, cleanTag, ...}) = let
170 :     val m = SV.mGet tail
171 :     in
172 :     case SV.mTake m
173 :     of (m' as MORE _) => (SV.mPut(m, m'); terminate info)
174 :     | TERMINATED => SV.mPut(m, TERMINATED)
175 :     | _ => (
176 :     CleanIO.removeCleaner cleanTag;
177 :     SV.mPut(m, TERMINATED))
178 :     (* end case *)
179 :     end
180 :    
181 :     (* find the end of the stream *)
182 :     fun findEOS (buf as IBUF{more, data, ...}) = (case (SV.mGet more)
183 :     of (MORE buf) => findEOS buf
184 :     | _ => ISTRM(buf, V.length data)
185 :     (* end case *))
186 :    
187 :     fun input (strm as ISTRM(buf, _)) =
188 :     generalizedInput (getBuffer (readVec buf, "input")) strm
189 :     fun input1 (ISTRM(buf, pos)) = let
190 :     val IBUF{data, more, ...} = buf
191 :     in
192 :     if (pos < V.length data)
193 :     then SOME(vecSub(data, pos), ISTRM(buf, pos+1))
194 :     else let
195 :     fun get (MORE buf) = input1 (ISTRM(buf, 0))
196 :     | get TERMINATED = NONE
197 :     | get NOMORE = (case SV.mTake more
198 :     of NOMORE => (
199 :     case extendStream (readVec buf, "input1", buf)
200 :     of EOF => NONE
201 :     | (DATA rest) => input1 (ISTRM(rest, 0))
202 :     (* end case *))
203 :     | next => (SV.mPut(more, next); get next)
204 :     (* end case *))
205 :     in
206 :     get (SV.mGet more)
207 :     end
208 :     end
209 :     fun inputN (ISTRM(buf, pos), n) = let
210 :     fun join (item, (list, strm)) = (item::list, strm)
211 :     fun inputList (buf as IBUF{data, ...}, i, n) = let
212 :     val len = V.length data
213 :     val remain = len-i
214 :     in
215 :     if (remain >= n)
216 :     then ([vecExtract(data, i, SOME n)], ISTRM(buf, i+n))
217 :     else join (
218 :     vecExtract(data, i, NONE),
219 :     nextBuf(buf, n-remain))
220 :     end
221 :     and nextBuf (buf as IBUF{more, data, ...}, n) = let
222 :     fun get (MORE buf) = inputList (buf, 0, n)
223 :     | get TERMINATED = ([], ISTRM(buf, V.length data))
224 :     | get NOMORE = (case (SV.mTake more)
225 :     of NOMORE => (case extendStream (readVec buf, "inputN", buf)
226 :     of EOF => ([], ISTRM(buf, V.length data))
227 :     | (DATA rest) => inputList (rest, 0, n)
228 :     (* end case *))
229 :     | next => (SV.mPut(more, next); get next)
230 :     (* end case *))
231 :     in
232 :     get (SV.mGet more)
233 :     end
234 :     val (data, strm) = inputList (buf, pos, n)
235 :     in
236 :     (V.concat data, strm)
237 :     end
238 :    
239 :     fun inputAll (strm as ISTRM(buf, _)) = let
240 :     val INFO{reader=PIO.RD{avail, ...}, ...} = infoOfIBuf buf
241 :     (* read a chunk that is as large as the available input. Note
242 :     * that for systems that use CR-LF for #"\n", the size will be
243 :     * too large, but this should be okay.
244 :     *)
245 :     fun bigChunk _ = let
246 :     val delta = (case avail()
247 :     of NONE => chunkSzOfIBuf buf
248 :     | (SOME n) => n
249 :     (* end case *))
250 :     in
251 :     readChunk buf delta
252 :     end
253 :     val bigInput =
254 :     generalizedInput (getBuffer (bigChunk, "inputAll"))
255 :     fun loop (v, strm) =
256 :     if (V.length v = 0) then [] else v :: loop(bigInput strm)
257 :     val data = V.concat (loop (bigInput strm))
258 :     in
259 :     (data, findEOS buf)
260 :     end
261 :    
262 :     (* Return SOME k, if k <= amount characters can be read without blocking. *)
263 :     fun canInput (strm as ISTRM(buf, pos), amount) = let
264 :     (******
265 :     val readVecNB = (case buf
266 :     of (IBUF{info as INFO{readVecNB=NONE, ...}, ...}) =>
267 :     inputExn(info, "canInput", IO.NonblockingNotSupported)
268 :     | (IBUF{info=INFO{readVecNB=SOME f, ...}, ...}) => f
269 :     (* end case *))
270 :     ******)
271 :     fun tryInput (buf as IBUF{data, ...}, i, n) = let
272 :     val len = V.length data
273 :     val remain = len - i
274 :     in
275 :     if (remain >= n)
276 :     then SOME n
277 :     else nextBuf (buf, n - remain)
278 :     end
279 :     and nextBuf (IBUF{more, ...}, n) = let
280 :     fun get (MORE buf) = tryInput (buf, 0, n)
281 :     | get TERMINATED = SOME(amount - n)
282 :     (******
283 :     | get NOMORE = (case SV.mTake more
284 :     of NOMORE => ((
285 :     case extendStream (readVecNB, "canInput", buf)
286 :     of EOF => SOME(amount - n)
287 :     | (DATA b) => tryInput (b, 0, n)
288 :     (* end case *))
289 :     handle IO.Io{cause=WouldBlock, ...} => SOME(amount - n))
290 :     | next => (SV.mPut(more, next); get next)
291 :     (* end case *))
292 :     ******)
293 :     | get NOMORE = SOME(amount - n)
294 :     in
295 :     get (SV.mGet more)
296 :     end
297 :     in
298 :     if (amount < 0)
299 :     then raise Size
300 :     else tryInput (buf, pos, amount)
301 :     end
302 : jhr 1267 (* close an input stream given its info structure; we need this function
303 :     * for the cleanup hook to avoid a space leak.
304 :     *)
305 :     fun closeInInfo (INFO{closed=ref true, ...}) = ()
306 :     | closeInInfo (info as INFO{closed, reader=PIO.RD{close, ...}, ...}) = (
307 : blume 844 (*** We need some kind of lock on the input stream to do this safely!!! ***)
308 : jhr 1267 terminate info;
309 :     closed := true;
310 :     close() handle ex => inputExn(info, "closeIn", ex))
311 :     fun closeIn (ISTRM(buf, _)) = closeInInfo (infoOfIBuf buf)
312 : blume 844 fun endOfStream (ISTRM(buf as IBUF{more, ...}, pos)) = (
313 :     case SV.mTake more
314 :     of (next as MORE _) => (SV.mPut(more, next); false)
315 :     | next => let
316 :     val IBUF{data, info=INFO{closed, ...}, ...} = buf
317 :     in
318 :     if (pos = V.length data)
319 :     then (case (next, !closed)
320 :     of (NOMORE, false) => (
321 :     case extendStream (readVec buf, "endOfStream", buf)
322 :     of EOF => true
323 :     | _ => false
324 :     (* end case *))
325 :     | _ => (SV.mPut(more, next); true)
326 :     (* end case *))
327 :     else (SV.mPut(more, next); false)
328 :     end
329 :     (* end case *))
330 :     fun mkInstream' (reader, data) = let
331 :     val PIO.RD{readVec, readVecEvt, getPos, setPos, ...} = reader
332 :     val getPos = (case (getPos, setPos)
333 :     of (SOME f, SOME _) => (fn () => SOME(f()))
334 :     | _ => (fn () => NONE)
335 :     (* end case *))
336 :     val more = SV.mVarInit NOMORE
337 :     val closedFlg = ref false
338 :     val tag = CleanIO.addCleaner dummyCleaner
339 :     val info = INFO{
340 :     reader=reader, readVec=readVec, readVecEvt=readVecEvt,
341 :     closed = closedFlg, getPos = getPos,
342 :     tail = SV.mVarInit more, cleanTag = tag
343 :     }
344 :     (** What should we do about the position in this case ?? **)
345 :     (** Suggestion: When building a stream with supplied initial data,
346 :     ** nothing can be said about the positions inside that initial
347 :     ** data (who knows where that data even came from!).
348 :     **)
349 :     val basePos = if (V.length data = 0) then getPos () else NONE
350 : jhr 1267 val buf = IBUF {
351 :     basePos = basePos, data = data,
352 :     info = info, more = more
353 :     }
354 : blume 844 val strm = ISTRM(buf, 0)
355 :     in
356 :     (tag, strm)
357 :     end
358 :     fun mkInstream arg = let
359 : jhr 1267 val (tag, strm as ISTRM(IBUF{info, ...}, _)) = mkInstream' arg
360 : blume 844 in
361 : jhr 1267 CleanIO.rebindCleaner (tag, fn () => closeInInfo info);
362 : blume 844 strm
363 :     end
364 :     fun getReader (ISTRM(buf, pos)) = let
365 :     val IBUF{data, info as INFO{reader, ...}, more, ...} = buf
366 :     fun getData more = (case SV.mGet more
367 :     of (MORE(IBUF{data, more=more', ...})) => data :: getData more'
368 :     | _ => []
369 :     (* end case *))
370 :     in
371 :     terminate info;
372 :     if (pos < V.length data)
373 :     then (
374 :     reader,
375 :     V.concat(vecExtract(data, pos, NONE) :: getData more)
376 :     )
377 :     else (reader, V.concat(getData more))
378 :     end
379 :    
380 :     (*
381 :     (** Position operations on instreams **)
382 :     datatype in_pos = INP of {
383 :     base : pos,
384 :     offset : int,
385 :     info : info
386 :     }
387 :     *)
388 :    
389 :     (*
390 :     fun getPosIn (ISTRM(buf, pos)) = (case buf
391 :     of IBUF{basePos=NONE, info, ...} =>
392 :     inputExn (info, "getPosIn", IO.RandomAccessNotSupported)
393 :     | IBUF{basePos=SOME p, info, ...} => INP{
394 :     base = p, offset = pos, info = info
395 :     }
396 :     (* end case *))
397 :     *)
398 :     (*
399 :     fun filePosIn (INP{base, offset, ...}) =
400 :     Position.+(base, Position.fromInt offset)
401 :     *)
402 :     (* Get the underlying file position of a stream *)
403 :     fun filePosIn (ISTRM(buf, pos)) = (case buf
404 :     of IBUF{basePos=NONE, info, ...} =>
405 :     inputExn (info, "filePosIn", IO.RandomAccessNotSupported)
406 :     | IBUF{basePos=SOME base, info, ...} => let
407 :     val INFO{reader=PIO.RD rd, readVec, ...} = info
408 :     in
409 :     case (#getPos rd, #setPos rd)
410 :     of (SOME getPos, SOME setPos) => let
411 :     val tmpPos = getPos()
412 :     fun readN 0 = ()
413 :     | readN n = (case V.length(readVec n)
414 :     of 0 => inputExn (
415 :     info, "filePosIn", Fail "bogus position")
416 :     | k => readN(n-k)
417 :     (* end case *))
418 :     in
419 :     setPos base;
420 :     readN pos;
421 :     getPos () before setPos tmpPos
422 :     end
423 :     | _ => raise Fail "filePosIn: impossible"
424 :     (* end case *)
425 :     end
426 :     (* end case *))
427 :     (*
428 :     fun setPosIn (pos as INP{info as INFO{reader, ...}, ...}) = let
429 :     val fpos = filePosIn pos
430 :     val (PIO.RD rd) = reader
431 :     in
432 :     terminate info;
433 :     valOf (#setPos rd) fpos;
434 :     mkInstream (PIO.RD rd, NONE)
435 :     end
436 :     *)
437 :    
438 :     (** Text stream specific operations **)
439 :     fun inputLine (ISTRM(buf as IBUF{data, ...}, pos)) = let
440 :     fun join (item, (list, strm)) = (item::list, strm)
441 :     fun nextBuf (isEmpty, buf as IBUF{more, data, ...}) = let
442 :     fun last () =
443 :     (if isEmpty then [] else ["\n"], ISTRM(buf, V.length data))
444 :     fun get (MORE buf) = scanData (buf, 0)
445 :     | get NOMORE = (case (SV.mTake more)
446 :     of NOMORE => (
447 :     case extendStream (readVec buf, "inputLine", buf)
448 :     of EOF => last ()
449 :     | (DATA rest) => scanData (rest, 0)
450 :     (* end case *))
451 :     | next => (SV.mPut(more, next); get next)
452 :     (* end case *))
453 :     | get TERMINATED = last()
454 :     in
455 :     get (SV.mGet more)
456 :     end
457 :     and scanData (buf as IBUF{data, ...}, i) = let
458 :     val len = V.length data
459 :     fun scan j = if (j = len)
460 :     then join(vecExtract(data, i, NONE), nextBuf(false, buf))
461 :     else if (vecSub(data, j) = #"\n")
462 :     then ([vecExtract(data, i, SOME(j+1-i))], ISTRM(buf, j+1))
463 :     else scan (j+1)
464 :     in
465 :     scan i
466 :     end
467 :     val (data, strm) = if (V.length data = pos)
468 :     then nextBuf (true, buf)
469 :     else scanData (buf, pos)
470 :     in
471 :     (V.concat data, strm)
472 :     end
473 :    
474 : jhr 1323 (* IO event constructors:
475 :     * We exploit the "functional" nature of stream IO to implement the event
476 :     * constructors. These constructors spawn a thread to do the operation
477 :     * and and write the result in an iVar that serves as the synchronization
478 :     * value.
479 :     * NOTE: this implementation has the weakness that it prevents shutdown when
480 :     * everything else is deadlocked, since the thread that is spawned to actually
481 :     * do the IO could proceed.
482 :     *)
483 :     local
484 :     datatype 'a result = RES of 'a | EXN of exn
485 :     fun doInput inputOp = let
486 :     fun input arg = RES(inputOp arg) handle ex => EXN ex
487 :     in
488 :     fn arg => CML.guard (fn () => let
489 :     val resV = SV.iVar()
490 :     in
491 :     CML.spawn (fn () => SV.iPut(resV, input arg));
492 :     CML.wrap(SV.iGetEvt resV,
493 :     fn (RES x) => x | (EXN ex) => raise ex)
494 :     end)
495 :     end
496 :     in
497 :     val input1Evt = doInput input1
498 :     val inputEvt = doInput input
499 :     val inputNEvt = doInput inputN
500 :     val inputAllEvt = doInput inputAll
501 :     val inputLineEvt = doInput inputLine
502 :     end (* local *)
503 :    
504 :    
505 : blume 844 (*** Output streams ***)
506 :    
507 :     (* an output stream is implemented as a monitor using an mvar to
508 :     * hold its data.
509 :     *)
510 :     datatype ostrm_info = OSTRM of {
511 :     buf : A.array,
512 :     pos : int ref,
513 :     closed : bool ref,
514 :     bufferMode : IO.buffer_mode ref,
515 :     writer : writer,
516 :     writeArr : {buf : A.array, i : int, sz : int option} -> unit,
517 :     writeVec : {buf : V.vector, i : int, sz : int option} -> unit,
518 :     cleanTag : CleanIO.tag
519 :     }
520 :    
521 :     type outstream = ostrm_info SV.mvar
522 :    
523 :     fun isNL #"\n" = true
524 :     | isNL _ = false
525 :    
526 :     fun isLineBreak (OSTRM{bufferMode, ...}) =
527 :     if (!bufferMode = IO.LINE_BUF) then isNL else (fn _ => false)
528 :    
529 :     fun outputExn (OSTRM{writer=PIO.WR{name, ...}, ...}, mlOp, exn) =
530 :     raise IO.Io{function=mlOp, name=name, cause=exn}
531 :    
532 :     (* lock access to the stream and make sure that it is not closed. *)
533 :     fun lockAndChkClosedOut (strmMV, mlOp) = (case SV.mTake strmMV
534 :     of (strm as OSTRM({closed=ref true, ...})) => (
535 :     SV.mPut (strmMV, strm);
536 :     outputExn (strm, mlOp, IO.ClosedStream))
537 :     | strm => strm
538 :     (* end case *))
539 :    
540 :     fun flushBuffer (strmMV, strm as OSTRM{buf, pos, writeArr, ...}, mlOp) = (
541 :     case !pos
542 :     of 0 => ()
543 :     | n => ((
544 :     writeArr {buf=buf, i=0, sz=SOME n}; pos := 0)
545 :     handle ex => (
546 :     SV.mPut(strmMV, strm); outputExn (strm, mlOp, ex)))
547 :     (* end case *))
548 :    
549 :     (* A version of copyVec that checks for newlines, while it is copying.
550 :     * This is used for LINE_BUF output of strings and substrings.
551 :     *)
552 :     fun lineBufCopyVec (src, srcI, srcLen, dst, dstI) = let
553 :     val stop = srcI+srcLen
554 :     fun cpy (srcI, dstI, lb) =
555 :     if (srcI < stop)
556 :     then let val c = vecSub(src, srcI)
557 :     in
558 :     arrUpdate (dst, dstI, c);
559 :     cpy (srcI+1, dstI+1, lb orelse isNL c)
560 :     end
561 :     else lb
562 :     in
563 :     cpy (srcI, dstI, false)
564 :     end
565 :    
566 :     (* a version of copyVec for BLOCK_BUF output of strings and substrings. *)
567 :     fun blockBufCopyVec (src, srcI, srcLen, dst, dstI) = (
568 :     A.copyVec {
569 :     src = src, si = srcI, len = SOME srcLen, dst = dst, di = dstI
570 :     };
571 :     false)
572 :    
573 :     fun output (strmMV, v) = let
574 :     val (strm as OSTRM os) = lockAndChkClosedOut (strmMV, "output")
575 :     fun release () = SV.mPut (strmMV, strm)
576 :     val {buf, pos, bufferMode, ...} = os
577 :     fun flush () = flushBuffer (strmMV, strm, "output")
578 :     fun flushAll () = (#writeArr os {buf=buf, i=0, sz=NONE}
579 :     handle ex => (release(); outputExn (strm, "output", ex)))
580 :     fun writeDirect () = (
581 :     case !pos
582 :     of 0 => ()
583 :     | n => (#writeArr os {buf=buf, i=0, sz=SOME n}; pos := 0)
584 :     (* end case *);
585 :     #writeVec os {buf=v, i=0, sz=NONE})
586 :     handle ex => (release(); outputExn (strm, "output", ex))
587 :     fun insert copyVec = let
588 :     val bufLen = A.length buf
589 :     val dataLen = V.length v
590 :     in
591 :     if (dataLen >= bufLen)
592 :     then writeDirect()
593 :     else let
594 :     val i = !pos
595 :     val avail = bufLen - i
596 :     in
597 :     if (avail < dataLen)
598 :     then let
599 :     val _ = A.copyVec{
600 :     src=v, si=0, len=SOME avail, dst=buf, di=i
601 :     }
602 :     val _ = flushAll()
603 :     val needsFlush = copyVec(v, avail, dataLen-avail, buf, 0)
604 :     in
605 :     pos := dataLen-avail;
606 :     if needsFlush then flush () else ()
607 :     end
608 :     else let
609 :     val needsFlush = copyVec(v, 0, dataLen, buf, i)
610 :     in
611 :     pos := i + dataLen;
612 :     if (needsFlush orelse (avail = dataLen))
613 :     then flush()
614 :     else ()
615 :     end
616 :     end
617 :     end
618 :     in
619 :     case !bufferMode
620 :     of IO.NO_BUF => writeDirect ()
621 :     | IO.LINE_BUF => insert lineBufCopyVec
622 :     | IO.BLOCK_BUF => insert blockBufCopyVec
623 :     (* end case *);
624 :     release()
625 :     end
626 :    
627 :     fun output1 (strmMV, elem) = let
628 :     val (strm as OSTRM{buf, pos, bufferMode, writeArr, ...}) =
629 :     lockAndChkClosedOut (strmMV, "output1")
630 :     fun release () = SV.mPut (strmMV, strm)
631 :     in
632 :     case !bufferMode
633 :     of IO.NO_BUF => (
634 :     arrUpdate (buf, 0, elem);
635 :     writeArr {buf=buf, i=0, sz=SOME 1}
636 :     handle ex => (release(); outputExn (strm, "output1", ex)))
637 :     | IO.LINE_BUF => let val i = !pos val i' = i+1
638 :     in
639 :     arrUpdate (buf, i, elem); pos := i';
640 :     if ((i' = A.length buf) orelse (isNL elem))
641 :     then flushBuffer (strmMV, strm, "output1")
642 :     else ()
643 :     end
644 :     | IO.BLOCK_BUF => let val i = !pos val i' = i+1
645 :     in
646 :     arrUpdate (buf, i, elem); pos := i';
647 :     if (i' = A.length buf)
648 :     then flushBuffer (strmMV, strm, "output1")
649 :     else ()
650 :     end
651 :     (* end case *);
652 :     release()
653 :     end
654 :    
655 :     fun flushOut strmMV = let
656 :     val strm = lockAndChkClosedOut (strmMV, "flushOut")
657 :     in
658 :     flushBuffer (strmMV, strm, "flushOut");
659 :     SV.mPut (strmMV, strm)
660 :     end
661 :    
662 :     fun closeOut strmMV = let
663 :     val (strm as OSTRM{writer=PIO.WR{close, ...}, closed, cleanTag, ...}) =
664 :     SV.mTake strmMV
665 :     in
666 :     if !closed
667 :     then ()
668 :     else (
669 :     flushBuffer (strmMV, strm, "closeOut");
670 :     closed := true;
671 :     CleanIO.removeCleaner cleanTag;
672 :     close());
673 :     SV.mPut (strmMV, strm)
674 :     end
675 :    
676 :     fun mkOutstream' (wr as PIO.WR{chunkSize, writeArr, writeVec, ...}, mode) =
677 :     let
678 :     fun iterate f (buf, i, sz) = let
679 :     fun lp (_, 0) = ()
680 :     | lp (i, n) = let val n' = f{buf=buf, i=i, sz=SOME n}
681 :     in lp (i+n', n-n') end
682 :     in
683 :     lp (i, sz)
684 :     end
685 :     fun writeArr' {buf, i, sz} = let
686 :     val len = (case sz
687 :     of NONE => A.length buf - i
688 :     | (SOME n) => n
689 :     (* end case *))
690 :     in
691 :     iterate writeArr (buf, i, len)
692 :     end
693 :     fun writeVec' {buf, i, sz} = let
694 :     val len = (case sz
695 :     of NONE => V.length buf - i
696 :     | (SOME n) => n
697 :     (* end case *))
698 :     in
699 :     iterate writeVec (buf, i, len)
700 :     end
701 :     (* install a dummy cleaner *)
702 :     val tag = CleanIO.addCleaner dummyCleaner
703 :     val strm = SV.mVarInit (OSTRM{
704 :     buf = A.array(chunkSize, someElem),
705 :     pos = ref 0,
706 :     closed = ref false,
707 :     bufferMode = ref mode,
708 :     writer = wr,
709 :     writeArr = writeArr',
710 :     writeVec = writeVec',
711 :     cleanTag = tag
712 :     })
713 :     in
714 :     (tag, strm)
715 :     end
716 :     fun mkOutstream arg = let
717 :     val (tag, strm) = mkOutstream' arg
718 :     in
719 :     CleanIO.rebindCleaner (tag, fn () => closeOut strm);
720 :     strm
721 :     end
722 :    
723 :     fun getWriter strmMV = let
724 :     val (strm as OSTRM{writer, bufferMode, ...}) =
725 :     lockAndChkClosedOut (strmMV, "getWriter")
726 :     in
727 :     (writer, !bufferMode) before SV.mPut(strmMV, strm)
728 :     end
729 :    
730 :     (** Position operations on outstreams **)
731 :     datatype out_pos = OUTP of {
732 :     pos : PIO.pos,
733 :     strm : outstream
734 :     }
735 :    
736 :     fun getPosOut strmMV = let
737 :     val (strm as OSTRM{writer, ...}) =
738 :     lockAndChkClosedOut (strmMV, "getWriter")
739 :     fun release () = SV.mPut(strmMV, strm)
740 :     in
741 :     flushBuffer (strmMV, strm, "getPosOut");
742 :     case writer
743 :     of PIO.WR{getPos=SOME f, ...} => (
744 :     OUTP{pos = f(), strm = strmMV}
745 :     handle ex => (release(); outputExn(strm, "getPosOut", ex)))
746 :     | _ => (
747 :     release();
748 :     outputExn(strm, "getPosOut", IO.RandomAccessNotSupported))
749 :     (* end case *)
750 :     before release()
751 :     end
752 :     fun filePosOut (OUTP{pos, strm=strmMV}) = (
753 :     SV.mPut (strmMV, lockAndChkClosedOut (strmMV, "filePosOut"));
754 :     pos)
755 :     fun setPosOut (OUTP{pos, strm=strmMV}) = let
756 :     val (strm as OSTRM{writer, ...}) =
757 :     lockAndChkClosedOut (strmMV, "setPosOut")
758 :     fun release () = SV.mPut(strmMV, strm)
759 :     in
760 :     case writer
761 :     of PIO.WR{setPos=SOME f, ...} => (
762 :     (f pos)
763 :     handle ex => (release(); outputExn(strm, "setPosOut", ex)))
764 :     | _ => (
765 :     release();
766 :     outputExn(strm, "getPosOut", IO.RandomAccessNotSupported))
767 :     (* end case *);
768 :     release()
769 :     end
770 :    
771 :     fun setBufferMode (strmMV, mode) = let
772 :     val (strm as OSTRM{bufferMode, ...}) =
773 :     lockAndChkClosedOut (strmMV, "setBufferMode")
774 :     in
775 :     if (mode = IO.NO_BUF)
776 :     then flushBuffer (strmMV, strm, "setBufferMode")
777 :     else ();
778 :     bufferMode := mode;
779 :     SV.mPut (strmMV, strm)
780 :     end
781 :     fun getBufferMode strmMV = let
782 :     (** should we be checking for closed streams here??? **)
783 :     val (strm as OSTRM{bufferMode, ...}) =
784 :     lockAndChkClosedOut (strmMV, "getBufferMode")
785 :     in
786 :     !bufferMode before SV.mPut (strmMV, strm)
787 :     end
788 :    
789 :     (** Text stream specific operations **)
790 :     fun outputSubstr (strmMV, ss) = let
791 :     val (strm as OSTRM os) = lockAndChkClosedOut (strmMV, "outputSubstr")
792 :     fun release () = SV.mPut (strmMV, strm)
793 :     val (v, dataStart, dataLen) = substringBase ss
794 :     val {buf, pos, bufferMode, ...} = os
795 :     val bufLen = A.length buf
796 :     fun flush () = flushBuffer (strmMV, strm, "outputSubstr")
797 :     fun flushAll () = (#writeArr os {buf=buf, i=0, sz=NONE}
798 :     handle ex => (release(); outputExn (strm, "outputSubstr", ex)))
799 :     fun writeDirect () = (
800 :     case !pos
801 :     of 0 => ()
802 :     | n => (#writeArr os {buf=buf, i=0, sz=SOME n}; pos := 0)
803 :     (* end case *);
804 :     #writeVec os {buf=v, i=dataStart, sz=SOME dataLen})
805 :     handle ex => (release(); outputExn (strm, "outputSubstr", ex))
806 :     fun insert copyVec = let
807 :     val bufLen = A.length buf
808 :     val dataLen = V.length v
809 :     in
810 :     if (dataLen >= bufLen)
811 :     then writeDirect()
812 :     else let
813 :     val i = !pos
814 :     val avail = bufLen - i
815 :     in
816 :     if (avail < dataLen)
817 :     then let
818 :     val _ = A.copyVec{
819 :     src=v, si=dataStart, len=SOME avail,
820 :     dst=buf, di=i
821 :     }
822 :     val _ = flushAll()
823 :     val needsFlush = copyVec(v, avail, dataLen-avail, buf, 0)
824 :     in
825 :     pos := dataLen-avail;
826 :     if needsFlush then flush () else ()
827 :     end
828 :     else let
829 :     val needsFlush = copyVec(v, dataStart, dataLen, buf, i)
830 :     in
831 :     pos := i + dataLen;
832 :     if (needsFlush orelse (avail = dataLen))
833 :     then flush()
834 :     else ()
835 :     end
836 :     end
837 :     end
838 :     in
839 :     case !bufferMode
840 :     of IO.NO_BUF => writeDirect ()
841 :     | IO.LINE_BUF => insert lineBufCopyVec
842 :     | IO.BLOCK_BUF => insert blockBufCopyVec
843 :     (* end case *);
844 :     release()
845 :     end
846 :    
847 :     end (* StreamIO *)
848 :    
849 :     type vector = V.vector
850 :     type elem = V.elem
851 :     type instream = StreamIO.instream SV.mvar
852 :     type outstream = StreamIO.outstream SV.mvar
853 :    
854 :     (** Input operations **)
855 :     fun input strm = let val (v, strm') = StreamIO.input(SV.mTake strm)
856 :     in
857 :     SV.mPut (strm, strm'); v
858 :     end
859 :     fun input1 strm = (case StreamIO.input1(SV.mTake strm)
860 :     of NONE => NONE
861 :     | (SOME(elem, strm')) => (SV.mPut (strm, strm'); SOME elem)
862 :     (* end case *))
863 :     fun inputN (strm, n) = let val (v, strm') = StreamIO.inputN (SV.mTake strm, n)
864 :     in
865 :     SV.mPut (strm, strm'); v
866 :     end
867 :     fun inputAll (strm : instream) = let
868 :     val (v, strm') = StreamIO.inputAll(SV.mTake strm)
869 :     in
870 :     SV.mPut (strm, strm'); v
871 :     end
872 :    
873 :     (* event-value constructors *)
874 :     local
875 :     datatype 'a result = RES of 'a | EXN of exn
876 :     fun sendEvt (ch, v) = CML.sendEvt(ch, RES v)
877 :     fun sendExnEvt (ch, exn) = CML.sendEvt(ch, EXN exn)
878 :     fun recvEvt ch =
879 :     CML.wrap(CML.recvEvt ch, fn (RES v) => v | (EXN exn) => raise exn)
880 :     fun doInput inputEvt (strm : instream) nack = let
881 :     val replyCh = CML.channel()
882 :     fun inputThread () = let
883 :     val strm' = SV.mTake strm
884 :     val nackEvt = CML.wrap(nack, fn _ => SV.mPut(strm, strm'))
885 :     fun handleInput (result, strm'') = CML.select [
886 :     CML.wrap (sendEvt(replyCh, result),
887 :     fn _ => SV.mPut(strm, strm'')),
888 :     nackEvt
889 :     ]
890 :     in
891 :     (CML.select [
892 :     CML.wrap (inputEvt strm', handleInput),
893 :     nackEvt
894 :     ]) handle exn => CML.select [
895 :     CML.wrap (sendExnEvt(replyCh, exn),
896 :     fn _ => SV.mPut(strm, strm')),
897 :     nackEvt
898 :     ]
899 :     end
900 :     in
901 :     ignore (CML.spawn inputThread);
902 :     recvEvt replyCh
903 :     end
904 :     in
905 :     fun input1Evt (strm : instream) = let
906 :     fun inputEvt (strm : StreamIO.instream) = CML.wrap (
907 :     StreamIO.input1Evt strm,
908 :     fn NONE => (NONE, strm) | SOME(s, strm') => (SOME s, strm'))
909 :     in
910 :     CML.withNack (doInput inputEvt strm)
911 :     end
912 :     fun inputEvt strm = CML.withNack (doInput StreamIO.inputEvt strm)
913 :     fun inputNEvt (strm, n) =
914 :     CML.withNack (doInput (fn strm' => StreamIO.inputNEvt(strm', n)) strm)
915 :     fun inputAllEvt Strm = CML.withNack (doInput StreamIO.inputAllEvt Strm)
916 :     end (* local *)
917 :    
918 :     fun canInput (strm, n) = StreamIO.canInput (SV.mGet strm, n)
919 :     fun lookahead (strm : instream) = (case StreamIO.input1(SV.mGet strm)
920 :     of NONE => NONE
921 :     | (SOME(elem, _)) => SOME elem
922 :     (* end case *))
923 :     fun closeIn strm = let
924 :     val (s as StreamIO.ISTRM(buf as StreamIO.IBUF{data, ...}, _)) =
925 :     SV.mTake strm
926 :     in
927 :     StreamIO.closeIn s;
928 :     SV.mPut(strm, StreamIO.findEOS buf)
929 :     end
930 :     fun endOfStream strm = StreamIO.endOfStream(SV.mGet strm)
931 :     (*
932 :     fun getPosIn strm = StreamIO.getPosIn(SV.mGet strm)
933 :     fun setPosIn (strm, p) = mUpdate(strm, StreamIO.setPosIn p)
934 :     *)
935 :    
936 :     (** Output operations **)
937 :     fun output (strm, v) = StreamIO.output(SV.mGet strm, v)
938 :     fun output1 (strm, c) = StreamIO.output1(SV.mGet strm, c)
939 :     fun flushOut strm = StreamIO.flushOut(SV.mGet strm)
940 :     fun closeOut strm = StreamIO.closeOut(SV.mGet strm)
941 :     fun getPosOut strm = StreamIO.getPosOut(SV.mGet strm)
942 :     fun setPosOut (strm, p as StreamIO.OUTP{strm=strm', ...}) = (
943 :     mUpdate(strm, strm'); StreamIO.setPosOut p)
944 :    
945 :     fun mkInstream (strm : StreamIO.instream) = SV.mVarInit strm
946 :     fun getInstream (strm : instream) = SV.mGet strm
947 :     fun setInstream (strm : instream, strm') = mUpdate(strm, strm')
948 :    
949 :     fun mkOutstream (strm : StreamIO.outstream) = SV.mVarInit strm
950 :     fun getOutstream (strm : outstream) = SV.mGet strm
951 :     fun setOutstream (strm : outstream, strm') = mUpdate(strm, strm')
952 :    
953 :     (* figure out the proper buffering mode for a given writer *)
954 :     fun bufferMode (PIO.WR{ioDesc=NONE, ...}) = IO.BLOCK_BUF
955 :     | bufferMode (PIO.WR{ioDesc=SOME iod, ...}) =
956 :     if (OS.IO.kind iod = OS.IO.Kind.tty) then IO.LINE_BUF else IO.BLOCK_BUF
957 :    
958 :     (** Open files **)
959 :     fun openIn fname =
960 :     mkInstream(StreamIO.mkInstream(OSPrimIO.openRd fname, empty))
961 :     handle ex => raise IO.Io{function="openIn", name=fname, cause=ex}
962 :     fun openOut fname = let
963 :     val wr = OSPrimIO.openWr fname
964 :     in
965 :     mkOutstream(StreamIO.mkOutstream(wr, bufferMode wr))
966 :     handle ex => raise IO.Io{function="openOut", name=fname, cause=ex}
967 :     end
968 :     fun openAppend fname =
969 :     mkOutstream(StreamIO.mkOutstream(OSPrimIO.openApp fname, IO.NO_BUF))
970 :     handle ex => raise IO.Io{function="openAppend", name=fname, cause=ex}
971 :    
972 :     (** Text stream specific operations **)
973 :     fun inputLine strm = let val (s, strm') = StreamIO.inputLine (SV.mTake strm)
974 :     in
975 :     SV.mPut(strm, strm'); s
976 :     end
977 :     fun outputSubstr (strm, ss) = StreamIO.outputSubstr (SV.mGet strm, ss)
978 :     fun openString src =
979 :     mkInstream(StreamIO.mkInstream(OSPrimIO.strReader src, empty))
980 :     handle ex => raise IO.Io{function="openIn", name="<string>", cause=ex}
981 :    
982 :     structure ChanIO = ChanIOFn(
983 :     structure PrimIO = PIO
984 :     structure V = CharVector
985 :     structure A = CharArray)
986 :    
987 :     (* open an instream that is connected to the output port of a channel. *)
988 :     fun openChanIn ch =
989 :     mkInstream(StreamIO.mkInstream(ChanIO.mkReader ch, empty))
990 :    
991 :     (* open an outstream that is connected to the input port of a channel. *)
992 :     fun openChanOut ch =
993 :     mkOutstream(StreamIO.mkOutstream(ChanIO.mkWriter ch, IO.NO_BUF))
994 :    
995 :     (** Standard streams **)
996 :     local
997 :     structure SIO = StreamIO
998 :     fun mkStdIn rebind = let
999 :     val (tag, strm) = SIO.mkInstream'(OSPrimIO.stdIn(), empty)
1000 :     in
1001 :     if rebind
1002 :     then CleanIO.rebindCleaner (tag, dummyCleaner)
1003 :     else ();
1004 :     strm
1005 :     end
1006 :     fun mkStdOut rebind = let
1007 :     val wr = OSPrimIO.stdOut()
1008 :     val (tag, strm) = SIO.mkOutstream'(wr, bufferMode wr)
1009 :     in
1010 :     if rebind
1011 :     then CleanIO.rebindCleaner (tag, fn () => SIO.flushOut strm)
1012 :     else ();
1013 :     strm
1014 :     end
1015 :     fun mkStdErr rebind = let
1016 :     val (tag, strm) = SIO.mkOutstream'(OSPrimIO.stdErr(), IO.NO_BUF)
1017 :     in
1018 :     if rebind
1019 :     then CleanIO.rebindCleaner (tag, fn () => SIO.flushOut strm)
1020 :     else ();
1021 :     strm
1022 :     end
1023 :     in
1024 :     (* build the standard streams. Since we are not currently running CML, we
1025 :     * cannot do the cleaner rebinding here, but that is okay, since these are
1026 :     * just place holders.
1027 :     *)
1028 :     val stdIn = mkInstream(mkStdIn false)
1029 :     val stdOut = mkOutstream(mkStdOut false)
1030 :     val stdErr = mkOutstream(mkStdErr false)
1031 :    
1032 :     fun print s = let val strm' = SV.mTake stdOut
1033 :     in
1034 :     StreamIO.output (strm', s); StreamIO.flushOut strm';
1035 :     SV.mPut(stdOut, strm')
1036 :     end
1037 :    
1038 :     fun scanStream scanFn = let
1039 :     val scan = scanFn StreamIO.input1
1040 :     fun doit strm = let
1041 :     val instrm = getInstream strm
1042 :     in
1043 :     case scan instrm
1044 :     of NONE => NONE
1045 :     | SOME(item, instrm') => (
1046 :     setInstream(strm, instrm');
1047 :     SOME item)
1048 :     (* end case *)
1049 :     end
1050 :     in
1051 :     doit
1052 :     end
1053 :    
1054 :     (* Establish a hook function to rebuild the I/O stack *)
1055 :     val _ = CleanIO.stdStrmHook := (fn () => (
1056 :     setInstream (stdIn, mkStdIn true);
1057 :     setOutstream (stdOut, mkStdOut true);
1058 :     setOutstream (stdErr, mkStdErr true);
1059 :     SMLofNJ.Internals.prHook := print))
1060 :     end (* local *)
1061 :    
1062 :     end (* TextIOFn *)

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0