Home My Page Projects Code Snippets Project Openings SML/NJ
Summary Activity Forums Tracker Lists Tasks Docs Surveys News SCM Files

SCM Repository

[smlnj] Annotation of /sml/trunk/src/cml/src/IO/text-io-fn.sml
ViewVC logotype

Annotation of /sml/trunk/src/cml/src/IO/text-io-fn.sml

Parent Directory Parent Directory | Revision Log Revision Log


Revision 8 - (view) (download)
Original Path: sml/branches/SMLNJ/src/cml/src/IO/text-io-fn.sml

1 : monnier 2 (* text-io-fn.sml
2 :     *
3 :     * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
4 :     *
5 :     * This is the CML version of the TextIO functor.
6 :     *)
7 :    
8 :     functor TextIOFn (
9 :    
10 :     structure OSPrimIO : sig
11 :     include OS_PRIM_IO
12 :     val stdIn : unit -> PrimIO.reader
13 :     val stdOut : unit -> PrimIO.writer
14 :     val stdErr : unit -> PrimIO.writer
15 :     val strReader : string -> PrimIO.reader
16 :     end
17 :     where type PrimIO.array = TextPrimIO.array
18 :     where type PrimIO.vector = TextPrimIO.vector
19 :     where type PrimIO.elem = TextPrimIO.elem
20 :     where type PrimIO.pos = TextPrimIO.pos
21 :     where type PrimIO.reader = TextPrimIO.reader
22 :     where type PrimIO.writer = TextPrimIO.writer
23 :    
24 :     ) : CML_TEXT_IO = struct
25 :    
26 :     structure PIO = OSPrimIO.PrimIO
27 :     structure A = CharArray
28 :     structure V = CharVector
29 :    
30 :     structure SV = SyncVar
31 :    
32 :     (* assign to an MVar *)
33 :     fun mUpdate (mv, x) = (SV.mTake mv; SV.mPut(mv, x))
34 :    
35 :     (* an element for initializing buffers *)
36 :     val someElem = #"\000"
37 :    
38 :     val vecExtract = V.extract
39 :     val vecSub = V.sub
40 :     val arrUpdate = A.update
41 :     val substringBase = Substring.base
42 :     val empty = ""
43 :    
44 :     fun dummyCleaner () = ()
45 :    
46 :     structure StreamIO =
47 :     struct
48 :     type vector = V.vector
49 :     type elem = V.elem
50 :     type reader = PIO.reader
51 :     type writer = PIO.writer
52 :     type pos = PIO.pos
53 :    
54 :     (*** Functional input streams ***)
55 :     datatype instream = ISTRM of (in_buffer * int)
56 :     and in_buffer = IBUF of {
57 :     basePos : pos option,
58 :     more : more SV.mvar, (* when this cell is empty, it means that *)
59 :     (* there is an outstanding request to the *)
60 :     (* server to extend the stream. *)
61 :     data : vector,
62 :     info : info
63 :     }
64 :     and more
65 :     = MORE of in_buffer (* forward link to additional data *)
66 :     | NOMORE (* placeholder for forward link *)
67 :     | TERMINATED (* termination of the stream *)
68 :    
69 :     and info = INFO of {
70 :     reader : reader,
71 :     readVec : int -> vector,
72 :     readVecEvt : int -> vector CML.event,
73 :     closed : bool ref,
74 :     getPos : unit -> pos option,
75 :     tail : more SV.mvar SV.mvar,
76 :     (* points to the more cell of the last buffer *)
77 :     cleanTag : CleanIO.tag
78 :     }
79 :    
80 :     fun infoOfIBuf (IBUF{info, ...}) = info
81 :     fun chunkSzOfIBuf buf = let
82 :     val INFO{reader=PIO.RD{chunkSize, ...}, ...} = infoOfIBuf buf
83 :     in
84 :     chunkSize
85 :     end
86 :     fun readVec (IBUF{info=INFO{readVec=f, ...}, ...}) = f
87 :    
88 :     fun inputExn (INFO{reader=PIO.RD{name, ...}, ...}, mlOp, exn) =
89 :     raise IO.Io{function=mlOp, name=name, cause=exn}
90 :    
91 :     datatype more_data = EOF | DATA of in_buffer
92 :    
93 :     (* extend the stream by a chunk.
94 :     * Invariant: the more m-variable is empty on entry and full on exit.
95 :     *)
96 :     fun extendStream (readFn, mlOp, buf as IBUF{more, info, ...}) = (let
97 :     val INFO{getPos, tail, ...} = info
98 :     val basePos = getPos()
99 :     val chunk = readFn (chunkSzOfIBuf buf)
100 :     in
101 :     if (V.length chunk = 0)
102 :     then (SV.mPut (more, NOMORE); EOF)
103 :     else let
104 :     val newMore = SV.mVar()
105 :     val buf' = IBUF{
106 :     basePos = basePos, data = chunk,
107 :     more = newMore, info = info
108 :     }
109 :     in
110 :     (* note that we do not fill the newMore cell until
111 :     * after the tail has been updated. This ensures
112 :     * that someone attempting to access the tail will
113 :     * not acquire the lock until after we are done.
114 :     *)
115 :     mUpdate (tail, newMore);
116 :     SV.mPut (more, MORE buf'); (* releases lock!! *)
117 :     SV.mPut (newMore, NOMORE);
118 :     DATA buf'
119 :     end
120 :     end
121 :     handle ex => (
122 :     SV.mPut (more, NOMORE);
123 :     inputExn(info, mlOp, ex)))
124 :    
125 :     (* get the next buffer in the stream, extending it if necessary. If
126 :     * the stream must be extended, we lock it by taking the value from the
127 :     * more cell; the extendStream function is responsible for filling in
128 :     * the cell.
129 :     *)
130 :     fun getBuffer (readFn, mlOp) (buf as IBUF{more, info, ...}) = let
131 :     fun get TERMINATED = EOF
132 :     | get (MORE buf') = DATA buf'
133 :     | get NOMORE = (case SV.mTake more
134 :     of NOMORE => extendStream (readFn, mlOp, buf)
135 :     | next => (SV.mPut(more, next); get next)
136 :     (* end case *))
137 :     in
138 :     get (SV.mGet more)
139 :     end
140 :    
141 :     (* read a chunk that is at least the specified size *)
142 :     fun readChunk buf = let
143 :     val INFO{readVec, reader=PIO.RD{chunkSize, ...}, ...} =
144 :     infoOfIBuf buf
145 :     in
146 :     case (chunkSize - 1)
147 :     of 0 => (fn n => readVec n)
148 :     | k => (* round up to next multiple of chunkSize *)
149 :     (fn n => readVec(Int.quot(n+k, chunkSize) * chunkSize))
150 :     (* end case *)
151 :     end
152 :    
153 :     fun generalizedInput getBuf = let
154 :     fun get (ISTRM(buf as IBUF{data, ...}, pos)) = let
155 :     val len = V.length data
156 :     in
157 :     if (pos < len)
158 :     then (vecExtract(data, pos, NONE), ISTRM(buf, len))
159 :     else (case (getBuf buf)
160 :     of EOF => (empty, ISTRM(buf, len))
161 :     | (DATA rest) => get (ISTRM(rest, 0))
162 :     (* end case *))
163 :     end
164 :     in
165 :     get
166 :     end
167 :    
168 :     (* terminate an input stream *)
169 :     fun terminate (info as INFO{tail, cleanTag, ...}) = let
170 :     val m = SV.mGet tail
171 :     in
172 :     case SV.mTake m
173 :     of (m' as MORE _) => (SV.mPut(m, m'); terminate info)
174 :     | TERMINATED => SV.mPut(m, TERMINATED)
175 :     | _ => (
176 :     CleanIO.removeCleaner cleanTag;
177 :     SV.mPut(m, TERMINATED))
178 :     (* end case *)
179 :     end
180 :    
181 :     (* find the end of the stream *)
182 :     fun findEOS (buf as IBUF{more, data, ...}) = (case (SV.mGet more)
183 :     of (MORE buf) => findEOS buf
184 :     | _ => ISTRM(buf, V.length data)
185 :     (* end case *))
186 :    
187 :     fun input (strm as ISTRM(buf, _)) =
188 :     generalizedInput (getBuffer (readVec buf, "input")) strm
189 :     fun input1 (ISTRM(buf, pos)) = let
190 :     val IBUF{data, more, ...} = buf
191 :     in
192 :     if (pos < V.length data)
193 :     then SOME(vecSub(data, pos), ISTRM(buf, pos+1))
194 :     else let
195 :     fun get (MORE buf) = input1 (ISTRM(buf, 0))
196 :     | get TERMINATED = NONE
197 :     | get NOMORE = (case SV.mTake more
198 :     of NOMORE => (
199 :     case extendStream (readVec buf, "input1", buf)
200 :     of EOF => NONE
201 :     | (DATA rest) => input1 (ISTRM(rest, 0))
202 :     (* end case *))
203 :     | next => (SV.mPut(more, next); get next)
204 :     (* end case *))
205 :     in
206 :     get (SV.mGet more)
207 :     end
208 :     end
209 :     fun inputN (ISTRM(buf, pos), n) = let
210 :     fun join (item, (list, strm)) = (item::list, strm)
211 :     fun inputList (buf as IBUF{data, ...}, i, n) = let
212 :     val len = V.length data
213 :     val remain = len-i
214 :     in
215 :     if (remain >= n)
216 :     then ([vecExtract(data, i, SOME n)], ISTRM(buf, i+n))
217 :     else join (
218 :     vecExtract(data, i, NONE),
219 :     nextBuf(buf, n-remain))
220 :     end
221 :     and nextBuf (buf as IBUF{more, data, ...}, n) = let
222 :     fun get (MORE buf) = inputList (buf, 0, n)
223 :     | get TERMINATED = ([], ISTRM(buf, V.length data))
224 :     | get NOMORE = (case (SV.mTake more)
225 :     of NOMORE => (case extendStream (readVec buf, "inputN", buf)
226 :     of EOF => ([], ISTRM(buf, V.length data))
227 :     | (DATA rest) => inputList (rest, 0, n)
228 :     (* end case *))
229 :     | next => (SV.mPut(more, next); get next)
230 :     (* end case *))
231 :     in
232 :     get (SV.mGet more)
233 :     end
234 :     val (data, strm) = inputList (buf, pos, n)
235 :     in
236 :     (V.concat data, strm)
237 :     end
238 :    
239 :     fun inputAll (strm as ISTRM(buf, _)) = let
240 :     val INFO{reader=PIO.RD{avail, ...}, ...} = infoOfIBuf buf
241 :     (* read a chunk that is as large as the available input. Note
242 :     * that for systems that use CR-LF for #"\n", the size will be
243 :     * too large, but this should be okay.
244 :     *)
245 :     fun bigChunk _ = let
246 :     val delta = (case avail()
247 :     of NONE => chunkSzOfIBuf buf
248 :     | (SOME n) => n
249 :     (* end case *))
250 :     in
251 :     readChunk buf delta
252 :     end
253 :     val bigInput =
254 :     generalizedInput (getBuffer (bigChunk, "inputAll"))
255 :     fun loop (v, strm) =
256 :     if (V.length v = 0) then [] else v :: loop(bigInput strm)
257 :     val data = V.concat (loop (bigInput strm))
258 :     in
259 :     (data, findEOS buf)
260 :     end
261 :    
262 :     fun input1Evt _ = raise Fail "input1Evt unimplemented"
263 :     fun inputEvt _ = raise Fail "inputEvt unimplemented"
264 :     fun inputNEvt _ = raise Fail "inputNEvt unimplemented"
265 :     fun inputAllEvt _ = raise Fail "inputAllEvt unimplemented"
266 :     fun inputLineEvt _ = raise Fail "inputLineEvt unimplemented"
267 :    
268 :     (* Return SOME k, if k <= amount characters can be read without blocking. *)
269 :     fun canInput (strm as ISTRM(buf, pos), amount) = let
270 :     (******
271 :     val readVecNB = (case buf
272 :     of (IBUF{info as INFO{readVecNB=NONE, ...}, ...}) =>
273 :     inputExn(info, "canInput", IO.NonblockingNotSupported)
274 :     | (IBUF{info=INFO{readVecNB=SOME f, ...}, ...}) => f
275 :     (* end case *))
276 :     ******)
277 :     fun tryInput (buf as IBUF{data, ...}, i, n) = let
278 :     val len = V.length data
279 :     val remain = len - i
280 :     in
281 :     if (remain >= n)
282 :     then SOME n
283 :     else nextBuf (buf, n - remain)
284 :     end
285 :     and nextBuf (IBUF{more, ...}, n) = let
286 :     fun get (MORE buf) = tryInput (buf, 0, n)
287 :     | get TERMINATED = SOME(amount - n)
288 :     (******
289 :     | get NOMORE = (case SV.mTake more
290 :     of NOMORE => ((
291 :     case extendStream (readVecNB, "canInput", buf)
292 :     of EOF => SOME(amount - n)
293 :     | (DATA b) => tryInput (b, 0, n)
294 :     (* end case *))
295 :     handle IO.Io{cause=WouldBlock, ...} => SOME(amount - n))
296 :     | next => (SV.mPut(more, next); get next)
297 :     (* end case *))
298 :     ******)
299 :     | get NOMORE = SOME(amount - n)
300 :     in
301 :     get (SV.mGet more)
302 :     end
303 :     in
304 :     if (amount < 0)
305 :     then raise Size
306 :     else tryInput (buf, pos, amount)
307 :     end
308 :     (*** We need some kind of lock on the input stream to do this safely!!! ***)
309 :     fun closeIn (ISTRM(buf, _)) = (case (infoOfIBuf buf)
310 :     of INFO{closed=ref true, ...} => ()
311 :     | (info as INFO{closed, reader=PIO.RD{close, ...}, ...}) => (
312 :     terminate info;
313 :     closed := true;
314 :     close() handle ex => inputExn(info, "closeIn", ex))
315 :     (* end case *))
316 :     fun endOfStream (ISTRM(buf as IBUF{more, ...}, pos)) = (
317 :     case SV.mTake more
318 :     of (next as MORE _) => (SV.mPut(more, next); false)
319 :     | next => let
320 :     val IBUF{data, info=INFO{closed, ...}, ...} = buf
321 :     in
322 :     if (pos = V.length data)
323 :     then (case (next, !closed)
324 :     of (NOMORE, false) => (
325 :     case extendStream (readVec buf, "endOfStream", buf)
326 :     of EOF => true
327 :     | _ => false
328 :     (* end case *))
329 :     | _ => (SV.mPut(more, next); true)
330 :     (* end case *))
331 :     else (SV.mPut(more, next); false)
332 :     end
333 :     (* end case *))
334 : monnier 8 fun mkInstream' (reader, optData) = let
335 : monnier 2 val PIO.RD{readVec, readVecEvt, getPos, setPos, ...} = reader
336 :     val getPos = (case (getPos, setPos)
337 :     of (SOME f, SOME _) => (fn () => SOME(f()))
338 :     | _ => (fn () => NONE)
339 :     (* end case *))
340 :     val more = SV.mVarInit NOMORE
341 :     val closedFlg = ref false
342 :     val tag = CleanIO.addCleaner dummyCleaner
343 :     val info = INFO{
344 :     reader=reader, readVec=readVec, readVecEvt=readVecEvt,
345 :     closed = closedFlg, getPos = getPos,
346 :     tail = SV.mVarInit more, cleanTag = tag
347 :     }
348 :     val buf = (case optData
349 :     of NONE => IBUF{
350 :     basePos = getPos(), data=empty,
351 :     info=info, more=more
352 :     }
353 :     (** What should we do about the position in this case ?? **)
354 :     (** Suggestion: When building a stream with supplied initial data,
355 :     ** nothing can be said about the positions inside that initial
356 :     ** data (who knows where that data even came from!).
357 :     **)
358 :     | (SOME v) => IBUF{
359 :     basePos = NONE, data=v,
360 :     info=info, more=more}
361 :     (* end case *))
362 :     val strm = ISTRM(buf, 0)
363 :     in
364 : monnier 8 (tag, strm)
365 :     end
366 :     fun mkInstream arg = let
367 :     val (tag, strm) = mkInstream' arg
368 :     in
369 : monnier 2 CleanIO.rebindCleaner (tag, fn () => closeIn strm);
370 :     strm
371 :     end
372 :     fun getReader (ISTRM(buf, pos)) = let
373 :     val IBUF{data, info as INFO{reader, ...}, more, ...} = buf
374 :     fun getData more = (case SV.mGet more
375 :     of (MORE(IBUF{data, more=more', ...})) => data :: getData more'
376 :     | _ => []
377 :     (* end case *))
378 :     in
379 :     terminate info;
380 :     if (pos < V.length data)
381 :     then (
382 :     reader,
383 :     V.concat(vecExtract(data, pos, NONE) :: getData more)
384 :     )
385 :     else (reader, V.concat(getData more))
386 :     end
387 :    
388 :     (** Position operations on instreams **)
389 :     datatype in_pos = INP of {
390 :     base : pos,
391 :     offset : int,
392 :     info : info
393 :     }
394 :    
395 :     fun getPosIn (ISTRM(buf, pos)) = (case buf
396 :     of IBUF{basePos=NONE, info, ...} =>
397 :     inputExn (info, "getPosIn", IO.RandomAccessNotSupported)
398 :     | IBUF{basePos=SOME p, info, ...} => INP{
399 :     base = p, offset = pos, info = info
400 :     }
401 :     (* end case *))
402 :     fun filePosIn (INP{base, offset, ...}) =
403 :     Position.+(base, Position.fromInt offset)
404 :     fun setPosIn (pos as INP{info as INFO{reader, ...}, ...}) = let
405 :     val fpos = filePosIn pos
406 :     val (PIO.RD rd) = reader
407 :     in
408 :     terminate info;
409 :     valOf (#setPos rd) fpos;
410 :     mkInstream (PIO.RD rd, NONE)
411 :     end
412 :    
413 :     (** Text stream specific operations **)
414 :     fun inputLine (ISTRM(buf as IBUF{data, ...}, pos)) = let
415 :     fun join (item, (list, strm)) = (item::list, strm)
416 :     fun nextBuf (isEmpty, buf as IBUF{more, data, ...}) = let
417 :     fun last () =
418 :     (if isEmpty then [] else ["\n"], ISTRM(buf, V.length data))
419 :     fun get (MORE buf) = scanData (buf, 0)
420 :     | get NOMORE = (case (SV.mTake more)
421 :     of NOMORE => (
422 :     case extendStream (readVec buf, "inputLine", buf)
423 :     of EOF => last ()
424 :     | (DATA rest) => scanData (rest, 0)
425 :     (* end case *))
426 :     | next => (SV.mPut(more, next); get next)
427 :     (* end case *))
428 :     | get TERMINATED = last()
429 :     in
430 :     get (SV.mGet more)
431 :     end
432 :     and scanData (buf as IBUF{data, ...}, i) = let
433 :     val len = V.length data
434 :     fun scan j = if (j = len)
435 :     then join(vecExtract(data, i, NONE), nextBuf(false, buf))
436 :     else if (vecSub(data, j) = #"\n")
437 :     then ([vecExtract(data, i, SOME(j+1-i))], ISTRM(buf, j+1))
438 :     else scan (j+1)
439 :     in
440 :     scan i
441 :     end
442 :     val (data, strm) = if (V.length data = pos)
443 :     then nextBuf (true, buf)
444 :     else scanData (buf, pos)
445 :     in
446 :     (V.concat data, strm)
447 :     end
448 :    
449 :     (*** Output streams ***)
450 :    
451 :     (* an output stream is implemented as a monitor using an mvar to
452 :     * hold its data.
453 :     *)
454 :     datatype ostrm_info = OSTRM of {
455 :     buf : A.array,
456 :     pos : int ref,
457 :     closed : bool ref,
458 :     bufferMode : IO.buffer_mode ref,
459 :     writer : writer,
460 :     writeArr : {buf : A.array, i : int, sz : int option} -> unit,
461 :     writeVec : {buf : V.vector, i : int, sz : int option} -> unit,
462 :     cleanTag : CleanIO.tag
463 :     }
464 :    
465 :     type outstream = ostrm_info SV.mvar
466 :    
467 :     fun isNL #"\n" = true
468 :     | isNL _ = false
469 :    
470 :     fun isLineBreak (OSTRM{bufferMode, ...}) =
471 :     if (!bufferMode = IO.LINE_BUF) then isNL else (fn _ => false)
472 :    
473 :     fun outputExn (OSTRM{writer=PIO.WR{name, ...}, ...}, mlOp, exn) =
474 :     raise IO.Io{function=mlOp, name=name, cause=exn}
475 :    
476 :     (* lock access to the stream and make sure that it is not closed. *)
477 :     fun lockAndChkClosedOut (strmMV, mlOp) = (case SV.mTake strmMV
478 :     of (strm as OSTRM({closed=ref true, ...})) => (
479 :     SV.mPut (strmMV, strm);
480 :     outputExn (strm, mlOp, IO.ClosedStream))
481 :     | strm => strm
482 :     (* end case *))
483 :    
484 :     fun flushBuffer (strmMV, strm as OSTRM{buf, pos, writeArr, ...}, mlOp) = (
485 :     case !pos
486 :     of 0 => ()
487 :     | n => ((
488 :     writeArr {buf=buf, i=0, sz=SOME n}; pos := 0)
489 :     handle ex => (
490 :     SV.mPut(strmMV, strm); outputExn (strm, mlOp, ex)))
491 :     (* end case *))
492 :    
493 :     (* A version of copyVec that checks for newlines, while it is copying.
494 :     * This is used for LINE_BUF output of strings and substrings.
495 :     *)
496 :     fun lineBufCopyVec (src, srcI, srcLen, dst, dstI) = let
497 :     val stop = srcI+srcLen
498 :     fun cpy (srcI, dstI, lb) =
499 :     if (srcI < stop)
500 :     then let val c = vecSub(src, srcI)
501 :     in
502 :     arrUpdate (dst, dstI, c);
503 :     cpy (srcI+1, dstI+1, lb orelse isNL c)
504 :     end
505 :     else lb
506 :     in
507 :     cpy (srcI, dstI, false)
508 :     end
509 :    
510 :     (* a version of copyVec for BLOCK_BUF output of strings and substrings. *)
511 :     fun blockBufCopyVec (src, srcI, srcLen, dst, dstI) = (
512 :     A.copyVec {
513 :     src = src, si = srcI, len = SOME srcLen, dst = dst, di = dstI
514 :     };
515 :     false)
516 :    
517 :     fun output (strmMV, v) = let
518 :     val (strm as OSTRM os) = lockAndChkClosedOut (strmMV, "output")
519 :     fun release () = SV.mPut (strmMV, strm)
520 :     val {buf, pos, bufferMode, ...} = os
521 :     fun flush () = flushBuffer (strmMV, strm, "output")
522 :     fun flushAll () = (#writeArr os {buf=buf, i=0, sz=NONE}
523 :     handle ex => (release(); outputExn (strm, "output", ex)))
524 :     fun writeDirect () = (
525 :     case !pos
526 :     of 0 => ()
527 :     | n => (#writeArr os {buf=buf, i=0, sz=SOME n}; pos := 0)
528 :     (* end case *);
529 :     #writeVec os {buf=v, i=0, sz=NONE})
530 :     handle ex => (release(); outputExn (strm, "output", ex))
531 :     fun insert copyVec = let
532 :     val bufLen = A.length buf
533 :     val dataLen = V.length v
534 :     in
535 :     if (dataLen >= bufLen)
536 :     then writeDirect()
537 :     else let
538 :     val i = !pos
539 :     val avail = bufLen - i
540 :     in
541 :     if (avail < dataLen)
542 :     then let
543 :     val _ = A.copyVec{
544 :     src=v, si=0, len=SOME avail, dst=buf, di=i
545 :     }
546 :     val _ = flushAll()
547 :     val needsFlush = copyVec(v, avail, dataLen-avail, buf, 0)
548 :     in
549 :     pos := dataLen-avail;
550 :     if needsFlush then flush () else ()
551 :     end
552 :     else let
553 :     val needsFlush = copyVec(v, 0, dataLen, buf, i)
554 :     in
555 :     pos := i + dataLen;
556 :     if (needsFlush orelse (avail = dataLen))
557 :     then flush()
558 :     else ()
559 :     end
560 :     end
561 :     end
562 :     in
563 :     case !bufferMode
564 :     of IO.NO_BUF => writeDirect ()
565 :     | IO.LINE_BUF => insert lineBufCopyVec
566 :     | IO.BLOCK_BUF => insert blockBufCopyVec
567 :     (* end case *);
568 :     release()
569 :     end
570 :    
571 :     fun output1 (strmMV, elem) = let
572 :     val (strm as OSTRM{buf, pos, bufferMode, writeArr, ...}) =
573 :     lockAndChkClosedOut (strmMV, "output1")
574 :     fun release () = SV.mPut (strmMV, strm)
575 :     in
576 :     case !bufferMode
577 :     of IO.NO_BUF => (
578 :     arrUpdate (buf, 0, elem);
579 :     writeArr {buf=buf, i=0, sz=SOME 1}
580 :     handle ex => (release(); outputExn (strm, "output1", ex)))
581 :     | IO.LINE_BUF => let val i = !pos val i' = i+1
582 :     in
583 :     arrUpdate (buf, i, elem); pos := i';
584 :     if ((i' = A.length buf) orelse (isNL elem))
585 :     then flushBuffer (strmMV, strm, "output1")
586 :     else ()
587 :     end
588 :     | IO.BLOCK_BUF => let val i = !pos val i' = i+1
589 :     in
590 :     arrUpdate (buf, i, elem); pos := i';
591 :     if (i' = A.length buf)
592 :     then flushBuffer (strmMV, strm, "output1")
593 :     else ()
594 :     end
595 :     (* end case *);
596 :     release()
597 :     end
598 :    
599 :     fun flushOut strmMV = let
600 :     val strm = lockAndChkClosedOut (strmMV, "flushOut")
601 :     in
602 :     flushBuffer (strmMV, strm, "flushOut");
603 :     SV.mPut (strmMV, strm)
604 :     end
605 :    
606 :     fun closeOut strmMV = let
607 :     val (strm as OSTRM{writer=PIO.WR{close, ...}, closed, cleanTag, ...}) =
608 :     SV.mTake strmMV
609 :     in
610 :     if !closed
611 :     then ()
612 :     else (
613 :     flushBuffer (strmMV, strm, "closeOut");
614 :     closed := true;
615 :     CleanIO.removeCleaner cleanTag;
616 :     close());
617 :     SV.mPut (strmMV, strm)
618 :     end
619 :    
620 : monnier 8 fun mkOutstream' (wr as PIO.WR{chunkSize, writeArr, writeVec, ...}, mode) =
621 : monnier 2 let
622 :     fun iterate f (buf, i, sz) = let
623 :     fun lp (_, 0) = ()
624 :     | lp (i, n) = let val n' = f{buf=buf, i=i, sz=SOME n}
625 :     in lp (i+n', n-n') end
626 :     in
627 :     lp (i, sz)
628 :     end
629 :     fun writeArr' {buf, i, sz} = let
630 :     val len = (case sz
631 :     of NONE => A.length buf - i
632 :     | (SOME n) => n
633 :     (* end case *))
634 :     in
635 :     iterate writeArr (buf, i, len)
636 :     end
637 :     fun writeVec' {buf, i, sz} = let
638 :     val len = (case sz
639 :     of NONE => V.length buf - i
640 :     | (SOME n) => n
641 :     (* end case *))
642 :     in
643 :     iterate writeVec (buf, i, len)
644 :     end
645 :     (* install a dummy cleaner *)
646 :     val tag = CleanIO.addCleaner dummyCleaner
647 :     val strm = SV.mVarInit (OSTRM{
648 :     buf = A.array(chunkSize, someElem),
649 :     pos = ref 0,
650 :     closed = ref false,
651 :     bufferMode = ref mode,
652 :     writer = wr,
653 :     writeArr = writeArr',
654 :     writeVec = writeVec',
655 :     cleanTag = tag
656 :     })
657 :     in
658 : monnier 8 (tag, strm)
659 :     end
660 :     fun mkOutstream arg = let
661 :     val (tag, strm) = mkOutstream' arg
662 :     in
663 : monnier 2 CleanIO.rebindCleaner (tag, fn () => closeOut strm);
664 :     strm
665 :     end
666 :    
667 :     fun getWriter strmMV = let
668 :     val (strm as OSTRM{writer, bufferMode, ...}) =
669 :     lockAndChkClosedOut (strmMV, "getWriter")
670 :     in
671 :     (writer, !bufferMode) before SV.mPut(strmMV, strm)
672 :     end
673 :    
674 :     (** Position operations on outstreams **)
675 :     datatype out_pos = OUTP of {
676 :     pos : PIO.pos,
677 :     strm : outstream
678 :     }
679 :    
680 :     fun getPosOut strmMV = let
681 :     val (strm as OSTRM{writer, ...}) =
682 :     lockAndChkClosedOut (strmMV, "getWriter")
683 :     fun release () = SV.mPut(strmMV, strm)
684 :     in
685 :     flushBuffer (strmMV, strm, "getPosOut");
686 :     case writer
687 :     of PIO.WR{getPos=SOME f, ...} => (
688 :     OUTP{pos = f(), strm = strmMV}
689 :     handle ex => (release(); outputExn(strm, "getPosOut", ex)))
690 :     | _ => (
691 :     release();
692 :     outputExn(strm, "getPosOut", IO.RandomAccessNotSupported))
693 :     (* end case *)
694 :     before release()
695 :     end
696 :     fun filePosOut (OUTP{pos, strm=strmMV}) = (
697 :     SV.mPut (strmMV, lockAndChkClosedOut (strmMV, "filePosOut"));
698 :     pos)
699 :     fun setPosOut (OUTP{pos, strm=strmMV}) = let
700 :     val (strm as OSTRM{writer, ...}) =
701 :     lockAndChkClosedOut (strmMV, "setPosOut")
702 :     fun release () = SV.mPut(strmMV, strm)
703 :     in
704 :     case writer
705 :     of PIO.WR{setPos=SOME f, ...} => (
706 :     (f pos)
707 :     handle ex => (release(); outputExn(strm, "setPosOut", ex)))
708 :     | _ => (
709 :     release();
710 :     outputExn(strm, "getPosOut", IO.RandomAccessNotSupported))
711 :     (* end case *);
712 :     release()
713 :     end
714 :    
715 :     fun setBufferMode (strmMV, mode) = let
716 :     val (strm as OSTRM{bufferMode, ...}) =
717 :     lockAndChkClosedOut (strmMV, "setBufferMode")
718 :     in
719 :     if (mode = IO.NO_BUF)
720 :     then flushBuffer (strmMV, strm, "setBufferMode")
721 :     else ();
722 :     bufferMode := mode;
723 :     SV.mPut (strmMV, strm)
724 :     end
725 :     fun getBufferMode strmMV = let
726 :     (** should we be checking for closed streams here??? **)
727 :     val (strm as OSTRM{bufferMode, ...}) =
728 :     lockAndChkClosedOut (strmMV, "getBufferMode")
729 :     in
730 :     !bufferMode before SV.mPut (strmMV, strm)
731 :     end
732 :    
733 :     (** Text stream specific operations **)
734 :     fun outputSubstr (strmMV, ss) = let
735 :     val (strm as OSTRM os) = lockAndChkClosedOut (strmMV, "outputSubstr")
736 :     fun release () = SV.mPut (strmMV, strm)
737 :     val (v, dataStart, dataLen) = substringBase ss
738 :     val {buf, pos, bufferMode, ...} = os
739 :     val bufLen = A.length buf
740 :     fun flush () = flushBuffer (strmMV, strm, "outputSubstr")
741 :     fun flushAll () = (#writeArr os {buf=buf, i=0, sz=NONE}
742 :     handle ex => (release(); outputExn (strm, "outputSubstr", ex)))
743 :     fun writeDirect () = (
744 :     case !pos
745 :     of 0 => ()
746 :     | n => (#writeArr os {buf=buf, i=0, sz=SOME n}; pos := 0)
747 :     (* end case *);
748 :     #writeVec os {buf=v, i=dataStart, sz=SOME dataLen})
749 :     handle ex => (release(); outputExn (strm, "outputSubstr", ex))
750 :     fun insert copyVec = let
751 :     val bufLen = A.length buf
752 :     val dataLen = V.length v
753 :     in
754 :     if (dataLen >= bufLen)
755 :     then writeDirect()
756 :     else let
757 :     val i = !pos
758 :     val avail = bufLen - i
759 :     in
760 :     if (avail < dataLen)
761 :     then let
762 :     val _ = A.copyVec{
763 :     src=v, si=dataStart, len=SOME avail,
764 :     dst=buf, di=i
765 :     }
766 :     val _ = flushAll()
767 :     val needsFlush = copyVec(v, avail, dataLen-avail, buf, 0)
768 :     in
769 :     pos := dataLen-avail;
770 :     if needsFlush then flush () else ()
771 :     end
772 :     else let
773 :     val needsFlush = copyVec(v, dataStart, dataLen, buf, i)
774 :     in
775 :     pos := i + dataLen;
776 :     if (needsFlush orelse (avail = dataLen))
777 :     then flush()
778 :     else ()
779 :     end
780 :     end
781 :     end
782 :     in
783 :     case !bufferMode
784 :     of IO.NO_BUF => writeDirect ()
785 :     | IO.LINE_BUF => insert lineBufCopyVec
786 :     | IO.BLOCK_BUF => insert blockBufCopyVec
787 :     (* end case *);
788 :     release()
789 :     end
790 :    
791 :     end (* StreamIO *)
792 :    
793 :     type vector = V.vector
794 :     type elem = V.elem
795 :     type instream = StreamIO.instream SV.mvar
796 :     type outstream = StreamIO.outstream SV.mvar
797 :    
798 :     (** Input operations **)
799 :     fun input strm = let val (v, strm') = StreamIO.input(SV.mTake strm)
800 :     in
801 :     SV.mPut (strm, strm'); v
802 :     end
803 :     fun input1 strm = (case StreamIO.input1(SV.mTake strm)
804 :     of NONE => NONE
805 :     | (SOME(elem, strm')) => (SV.mPut (strm, strm'); SOME elem)
806 :     (* end case *))
807 :     fun inputN (strm, n) = let val (v, strm') = StreamIO.inputN (SV.mTake strm, n)
808 :     in
809 :     SV.mPut (strm, strm'); v
810 :     end
811 :     fun inputAll (strm : instream) = let
812 :     val (v, strm') = StreamIO.inputAll(SV.mTake strm)
813 :     in
814 :     SV.mPut (strm, strm'); v
815 :     end
816 :     fun input1Evt _ = raise Fail "input1Evt unimplemented"
817 :     fun inputEvt _ = raise Fail "inputEvt unimplemented"
818 :     fun inputNEvt _ = raise Fail "inputNEvt unimplemented"
819 :     fun inputAllEvt _ = raise Fail "inputAllEvt unimplemented"
820 :     fun canInput (strm, n) = StreamIO.canInput (SV.mGet strm, n)
821 :     fun lookahead (strm : instream) = (case StreamIO.input1(SV.mGet strm)
822 :     of NONE => NONE
823 :     | (SOME(elem, _)) => SOME elem
824 :     (* end case *))
825 :     fun closeIn strm = let
826 :     val (s as StreamIO.ISTRM(buf as StreamIO.IBUF{data, ...}, _)) =
827 :     SV.mTake strm
828 :     in
829 :     StreamIO.closeIn s;
830 :     SV.mPut(strm, StreamIO.findEOS buf)
831 :     end
832 :     fun endOfStream strm = StreamIO.endOfStream(SV.mGet strm)
833 :     fun getPosIn strm = StreamIO.getPosIn(SV.mGet strm)
834 :     fun setPosIn (strm, p) = mUpdate(strm, StreamIO.setPosIn p)
835 :    
836 :     (** Output operations **)
837 :     fun output (strm, v) = StreamIO.output(SV.mGet strm, v)
838 :     fun output1 (strm, c) = StreamIO.output1(SV.mGet strm, c)
839 :     fun flushOut strm = StreamIO.flushOut(SV.mGet strm)
840 :     fun closeOut strm = StreamIO.closeOut(SV.mGet strm)
841 :     fun getPosOut strm = StreamIO.getPosOut(SV.mGet strm)
842 :     fun setPosOut (strm, p as StreamIO.OUTP{strm=strm', ...}) = (
843 :     mUpdate(strm, strm'); StreamIO.setPosOut p)
844 :    
845 :     fun mkInstream (strm : StreamIO.instream) = SV.mVarInit strm
846 :     fun getInstream (strm : instream) = SV.mGet strm
847 :     fun setInstream (strm : instream, strm') = mUpdate(strm, strm')
848 :    
849 :     fun mkOutstream (strm : StreamIO.outstream) = SV.mVarInit strm
850 :     fun getOutstream (strm : outstream) = SV.mGet strm
851 :     fun setOutstream (strm : outstream, strm') = mUpdate(strm, strm')
852 :    
853 :     (* figure out the proper buffering mode for a given writer *)
854 :     fun bufferMode (PIO.WR{ioDesc=NONE, ...}) = IO.BLOCK_BUF
855 :     | bufferMode (PIO.WR{ioDesc=SOME iod, ...}) =
856 :     if (OS.IO.kind iod = OS.IO.Kind.tty) then IO.LINE_BUF else IO.BLOCK_BUF
857 :    
858 :     (** Open files **)
859 :     fun openIn fname =
860 :     mkInstream(StreamIO.mkInstream(OSPrimIO.openRd fname, NONE))
861 :     handle ex => raise IO.Io{function="openIn", name=fname, cause=ex}
862 :     fun openOut fname = let
863 :     val wr = OSPrimIO.openWr fname
864 :     in
865 :     mkOutstream(StreamIO.mkOutstream(wr, bufferMode wr))
866 :     handle ex => raise IO.Io{function="openOut", name=fname, cause=ex}
867 :     end
868 :     fun openAppend fname =
869 :     mkOutstream(StreamIO.mkOutstream(OSPrimIO.openApp fname, IO.NO_BUF))
870 :     handle ex => raise IO.Io{function="openAppend", name=fname, cause=ex}
871 :    
872 :     (** Text stream specific operations **)
873 :     fun inputLine strm = let val (s, strm') = StreamIO.inputLine (SV.mTake strm)
874 :     in
875 :     SV.mPut(strm, strm'); s
876 :     end
877 :     fun outputSubstr (strm, ss) = StreamIO.outputSubstr (SV.mGet strm, ss)
878 :     fun openString src =
879 :     mkInstream(StreamIO.mkInstream(OSPrimIO.strReader src, NONE))
880 :     handle ex => raise IO.Io{function="openIn", name="<string>", cause=ex}
881 :    
882 :     structure ChanIO = ChanIOFn(
883 :     structure PrimIO = PIO
884 :     structure V = CharVector
885 :     structure A = CharArray)
886 :    
887 :     (* open an instream that is connected to the output port of a channel. *)
888 :     fun openChanIn ch =
889 :     mkInstream(StreamIO.mkInstream(ChanIO.mkReader ch, NONE))
890 :    
891 :     (* open an outstream that is connected to the input port of a channel. *)
892 :     fun openChanOut ch =
893 :     mkOutstream(StreamIO.mkOutstream(ChanIO.mkWriter ch, IO.NO_BUF))
894 :    
895 :     (** Standard streams **)
896 :     local
897 :     structure SIO = StreamIO
898 :     fun mkStdIn rebind = let
899 : monnier 8 val (tag, strm) = SIO.mkInstream'(OSPrimIO.stdIn(), NONE)
900 : monnier 2 in
901 :     if rebind
902 : monnier 8 then CleanIO.rebindCleaner (tag, dummyCleaner)
903 : monnier 2 else ();
904 :     strm
905 :     end
906 :     fun mkStdOut rebind = let
907 :     val wr = OSPrimIO.stdOut()
908 : monnier 8 val (tag, strm) = SIO.mkOutstream'(wr, bufferMode wr)
909 : monnier 2 in
910 :     if rebind
911 : monnier 8 then CleanIO.rebindCleaner (tag, fn () => SIO.flushOut strm)
912 : monnier 2 else ();
913 :     strm
914 :     end
915 :     fun mkStdErr rebind = let
916 : monnier 8 val (tag, strm) = SIO.mkOutstream'(OSPrimIO.stdErr(), IO.NO_BUF)
917 : monnier 2 in
918 :     if rebind
919 : monnier 8 then CleanIO.rebindCleaner (tag, fn () => SIO.flushOut strm)
920 : monnier 2 else ();
921 :     strm
922 :     end
923 :     in
924 :     (* build the standard streams. Since we are not currently running CML, we
925 :     * cannot do the cleaner rebinding here, but that is okay, since these are
926 :     * just place holders.
927 :     *)
928 :     val stdIn = mkInstream(mkStdIn false)
929 :     val stdOut = mkOutstream(mkStdOut false)
930 :     val stdErr = mkOutstream(mkStdErr false)
931 :    
932 :     fun print s = let val strm' = SV.mTake stdOut
933 :     in
934 :     StreamIO.output (strm', s); StreamIO.flushOut strm';
935 :     SV.mPut(stdOut, strm')
936 :     end
937 :    
938 : monnier 8 fun scanStream scanFn = let
939 :     val scan = scanFn StreamIO.input1
940 :     fun doit strm = let
941 :     val instrm = getInstream strm
942 :     in
943 :     case scan instrm
944 :     of NONE => NONE
945 :     | SOME(item, instrm') => (
946 :     setInstream(strm, instrm');
947 :     SOME item)
948 :     (* end case *)
949 :     end
950 :     in
951 :     doit
952 :     end
953 :    
954 : monnier 2 (* Establish a hook function to rebuild the I/O stack *)
955 :     val _ = CleanIO.stdStrmHook := (fn () => (
956 :     setInstream (stdIn, mkStdIn true);
957 :     setOutstream (stdOut, mkStdOut true);
958 :     setOutstream (stdErr, mkStdErr true);
959 :     SMLofNJ.Internals.prHook := print))
960 :     end (* local *)
961 :    
962 :     end (* TextIOFn *)

root@smlnj-gforge.cs.uchicago.edu
ViewVC Help
Powered by ViewVC 1.0.0