## winix-text-file-for-os-g.pkg
#
# The threadkit version of winix_text_file_for_os_g__premicrothread. # winix_text_file_for_os_g__premicrothread is from
src/lib/std/src/io/winix-text-file-for-os-g--premicrothread.pkg# Compiled by:
#
src/lib/std/standard.libstipulate
package cv = vector_of_chars; # vector_of_chars is from
src/lib/std/vector-of-chars.pkg package cvs = vector_slice_of_chars; # vector_slice_of_chars is from
src/lib/std/src/vector-slice-of-chars.pkg package eow = io_startup_and_shutdown; # "eow" == "end of world" # io_startup_and_shutdown is from
src/lib/std/src/io/io-startup-and-shutdown.pkg package iox = io_exceptions; # io_exceptions is from
src/lib/std/src/io/io-exceptions.pkg package psx = posixlib; # posixlib is from
src/lib/std/src/psx/posixlib.pkg package ri = runtime_internals; # runtime_internals is from
src/lib/std/src/nj/runtime-internals.pkg package td = winix_base_text_file_io_driver_for_posix; # winix_base_text_file_io_driver_for_posix is from
src/lib/std/src/io/winix-base-text-file-io-driver-for-posix.pkg package thk = threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg package wcs = rw_vector_slice_of_chars; # rw_vector_slice_of_chars is from
src/lib/std/src/rw-vector-slice-of-chars.pkg package wcv = rw_vector_of_chars; # rw_vector_of_chars is from
src/lib/std/rw-vector-of-chars.pkg package wty = winix_types; # winix_types is from
src/lib/std/src/posix/winix-types.pkg nb = log::note_on_stderr; # log is from
src/lib/std/src/log.pkgherein
# This generic is invoked (only) in:
#
#
src/lib/std/src/posix/winix-text-file-for-posix.pkg #
generic package winix_text_file_for_os_g (
# ========================
#
package wxd # "wxd" == "WiniX file io Driver".
:
api {
include api Winix_Extended_File_Io_Driver_For_Os; # Winix_Extended_File_Io_Driver_For_Os is from
src/lib/std/src/io/winix-extended-file-io-driver-for-os.api #
stdin: Void -> drv::Filereader;
stdout: Void -> drv::Filewriter;
stderr: Void -> drv::Filewriter;
#
string_reader: String -> drv::Filereader;
}
where drv::Rw_Vector == td::Rw_Vector
where drv::Vector == td::Vector
where drv::Rw_Vector_Slice == td::Rw_Vector_Slice
where drv::Vector_Slice == td::Vector_Slice
where drv::Element == td::Element
where drv::File_Position == td::File_Position
where drv::Filereader == td::Filereader
where drv::Filewriter == td::Filewriter;
)
: (weak) Winix_Text_File_For_Os # Winix_Text_File_For_Os is from
src/lib/std/src/io/winix-text-file-for-os.api {
include package threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg stipulate
package drv = wxd::drv;
herein
# Assign to an maildrop:
#
fun m_update (mv, x)
=
ignore (maildrop_swap (mv, x));
# An element for initializing buffers:
#
some_element = '\000';
vec_extract = cvs::to_vector o cvs::make_slice;
vec_get = cv::get;
rw_vec_set = wcv::set; # should rename vec -> vector all through here XXX SUCKO FIXME
burst_substring = substring::burst_substring;
empty_string = "";
fun dummy_cleaner ()
=
();
package pur { # Exported to client packages.
#
Vector = cv::Vector;
Element = cv::Element;
Filereader = drv::Filereader;
Filewriter = drv::Filewriter;
File_Position = drv::File_Position;
# Functional input streams:
#
Input_Stream
=
INPUT_STREAM (Input_Buffer, Int)
also
Input_Buffer
=
INPUT_BUFFER
{
data: Vector,
file_position: Null_Or( File_Position ),
nextdrop: Maildrop( Next ), # When this cell is empty, it means that
# there is an outstanding request to the
# server to extend the stream.
global_file_stuff: Global_File_Stuff
}
also
Next
= NEXT Input_Buffer # Forward link to additional data.
| NO_NEXT
# Placeholder for forward link.
| TERMINATED
# Termination of the stream.
also
Global_File_Stuff
=
GLOBAL_FILE_STUFF
{ filereader: Filereader,
#
read_vector: Int -> Vector,
read_vector_mailop: Int -> thk::Mailop( Vector ),
#
closed: Ref( Bool ),
#
get_file_position: Void -> Null_Or( File_Position ),
#
last_nextref: Maildrop( Maildrop( Next) ), # Points to the 'next' refcell in the last buffer.
#
clean_tag: eow::Tag
};
fun global_file_stuff_of_ibuf (INPUT_BUFFER { global_file_stuff, ... } )
=
global_file_stuff;
fun best_io_quantum_of_ibuf buf
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { best_io_quantum, ... }, ... };
best_io_quantum;
};
fun read_vector (INPUT_BUFFER { global_file_stuff => GLOBAL_FILE_STUFF { read_vector => f, ... }, ... } ) # Should this be renamed get_read_vector? XXX QUERO FIXME
=
f;
fun raise_io_exception (GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { filename, ... }, ... }, ml_op, exn)
=
raise exception iox::IO { op=>ml_op, name=>filename, cause=>exn };
Next_Data = EOF
| DATA Input_Buffer
;
# Terminate an input stream:
#
fun terminate (global_file_stuff as GLOBAL_FILE_STUFF { last_nextref, clean_tag, ... } )
=
{ m = thk::get_from_maildrop last_nextref;
#
case (take_from_maildrop m)
#
(m' as NEXT _)
=>
{ put_in_maildrop (m, m');
#
terminate global_file_stuff;
};
TERMINATED
=>
put_in_maildrop (m, TERMINATED);
_ => { eow::drop_stream_startup_and_shutdown_actions clean_tag;
put_in_maildrop (m, TERMINATED);
};
esac;
};
# Close an input stream given its global_file_stuff record;
# we need this function for the cleanup hook to
# avoid a space leak.
#
fun close_in_global_file_stuff (GLOBAL_FILE_STUFF { closed => REF TRUE, ... } )
=>
();
close_in_global_file_stuff (global_file_stuff as GLOBAL_FILE_STUFF { closed, filereader => drv::FILEREADER { close, ... }, ... } )
=>
{
# ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
terminate global_file_stuff;
closed := TRUE;
close()
except
ex = raise_io_exception (global_file_stuff, "close_input", ex);
};
end;
# Extend the stream by a chunk.
# Invariant: the next m-variable
# is empty on entry and full on exit.
#
fun extend_stream (read_fn, ml_op, buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
=
{ global_file_stuff -> GLOBAL_FILE_STUFF { get_file_position, last_nextref, ... };
file_position
=
get_file_position ();
chunk = read_fn (best_io_quantum_of_ibuf buf);
if (cv::length chunk == 0)
#
put_in_maildrop (nextdrop, NO_NEXT);
close_in_global_file_stuff global_file_stuff;
EOF;
else
new_next = make_empty_maildrop ();
buf' = INPUT_BUFFER
{
file_position,
global_file_stuff,
data => chunk,
nextdrop => new_next
};
# Note that we do not fill the next cell
# until after the last_nextref has been updated.
#
# This ensures that someone attempting to
# access the last_nextref will not acquire the lock
# until after we are done.
#
m_update (last_nextref, new_next);
put_in_maildrop (nextdrop, NEXT buf'); # releases lock!!
put_in_maildrop (new_next, NO_NEXT);
DATA buf';
fi;
}
except ex
=
{
put_in_maildrop (nextdrop, NO_NEXT);
raise_io_exception (global_file_stuff, ml_op, ex);
};
# Get the next buffer in the stream,
# extending it if necessary.
#
# If the stream must be extended,
# we lock it by taking the value from the
# next cell; the extend_stream function
# is responsible for filling in the cell.
#
fun get_next_buffer (read_fn, ml_op) (buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
=
get (thk::get_from_maildrop nextdrop)
where
fun get TERMINATED => EOF;
get (NEXT buf') => DATA buf';
get NO_NEXT => case (take_from_maildrop nextdrop)
#
NO_NEXT => extend_stream (read_fn, ml_op, buf);
#
other => {
put_in_maildrop (nextdrop, other);
get other;
};
esac;
end;
end;
# Read a chunk that is at least the specified size:
#
fun read_chunk buf
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { read_vector, filereader => drv::FILEREADER { best_io_quantum, ... }, ... };
case (best_io_quantum - 1)
#
0 => \\ n = read_vector n;
#
k => # Round up to next multiple of best_io_quantum:
#
\\ n = read_vector (int::quot (n+k, best_io_quantum) * best_io_quantum);
esac;
};
fun generalized_input get_buf
=
get
where
fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
=
{ len = cv::length data;
#
if (pos < len)
#
(vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
else
case (get_buf buf)
#
DATA rest => get (INPUT_STREAM (rest, 0));
EOF => (empty_string, INPUT_STREAM (buf, len));
esac;
fi;
};
end;
# Find the end of the stream:
#
fun find_eos (buf as INPUT_BUFFER { nextdrop, data, ... } )
=
case (thk::get_from_maildrop nextdrop)
#
NEXT buf => find_eos buf;
_ => INPUT_STREAM (buf, cv::length data);
esac;
fun read (stream as INPUT_STREAM (buf, _))
=
generalized_input (get_next_buffer (read_vector buf, "read")) stream;
fun read_one (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, nextdrop, ... };
if (pos < cv::length data)
#
THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
else
fun get (NEXT buf)
=>
read_one (INPUT_STREAM (buf, 0));
get TERMINATED
=>
NULL;
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT => case (extend_stream (read_vector buf, "read_one", buf))
#
EOF => NULL;
DATA rest => read_one (INPUT_STREAM (rest, 0));
esac;
other=> { put_in_maildrop (nextdrop, other);
#
get other;
};
esac;
end;
get (thk::get_from_maildrop nextdrop);
fi;
};
fun read_n (INPUT_STREAM (buf, pos), n)
=
{ fun join (item, (list, stream))
=
(item ! list, stream);
fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
=
{ len = cv::length data;
#
remain = len-i;
if (remain >= n)
#
([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
else
join (vec_extract (data, i, NULL), next_buf (buf, n-remain));
fi;
}
also
fun next_buf (buf as INPUT_BUFFER { nextdrop, data, ... }, n)
=
get (thk::get_from_maildrop nextdrop)
where
fun get (NEXT buf)
=>
input_list (buf, 0, n);
get TERMINATED
=>
([], INPUT_STREAM (buf, cv::length data));
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT
=>
case (extend_stream (read_vector buf, "read_n", buf))
#
EOF => ([], INPUT_STREAM (buf, cv::length data));
DATA rest => input_list (rest, 0, n);
esac;
other=> { put_in_maildrop (nextdrop, other);
#
get other;
};
esac;
end;
end;
(input_list (buf, pos, n))
->
(data, stream);
(cv::cat data, stream);
};
fun read_all (stream as INPUT_STREAM (buf, _))
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { avail, ... }, ... };
big_input = generalized_input (get_next_buffer (big_chunk, "read_all"))
where
fun big_chunk _
=
# Read a chunk that is as large as the available input.
# Note that for systems that use CR-LF for '\n',
# the size will be too large, but this should be okay.
#
{ delta = case (avail ())
#
THE n => n;
NULL => best_io_quantum_of_ibuf buf;
esac;
read_chunk buf delta;
};
end;
data = cv::cat (loop (big_input stream))
where
fun loop (v, stream)
=
if (cv::length v == 0) [];
else v ! loop (big_input stream);
fi;
end;
(data, find_eos buf);
};
fun close_input (INPUT_STREAM (buf, _))
=
close_in_global_file_stuff (global_file_stuff_of_ibuf buf);
fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { nextdrop, ... }, pos))
=
case (take_from_maildrop nextdrop)
#
(other as NEXT _)
=>
{ put_in_maildrop (nextdrop, other);
#
FALSE;
};
other
=>
{ buf -> INPUT_BUFFER { data, global_file_stuff=>GLOBAL_FILE_STUFF { closed, ... }, ... };
if (pos == cv::length data)
#
case (other, *closed)
#
(NO_NEXT, FALSE)
=>
case (extend_stream (read_vector buf, "end_of_stream", buf))
#
EOF => TRUE;
_ => FALSE;
esac;
_ => { put_in_maildrop (nextdrop, other);
#
TRUE;
};
esac;
else
put_in_maildrop (nextdrop, other);
FALSE;
fi;
};
esac;
fun make_instream' (filereader, data)
=
{ filereader -> drv::FILEREADER { read_vector,
read_vector_mailop,
get_file_position,
set_file_position,
...
};
get_file_position
=
case (get_file_position, set_file_position)
#
(THE f, THE _) => \\ () = THE (f());
_ => \\ () = NULL;
esac;
nextdrop = make_full_maildrop NO_NEXT;
closed_flag = REF FALSE;
clean_tag = eow::note_stream_startup_and_shutdown_actions dummy_cleaner;
global_file_stuff
=
GLOBAL_FILE_STUFF
{
filereader,
read_vector,
read_vector_mailop,
get_file_position,
clean_tag,
#
closed => closed_flag,
last_nextref => make_full_maildrop nextdrop
};
# What should we do about the position in this case ??
# Suggestion: When building a stream with supplied initial data,
# nothing can be said about the positions inside that initial
# data (who knows where that data even came from!).
file_position
=
if (cv::length data == 0) get_file_position ();
else NULL;
fi;
buf = INPUT_BUFFER
{
file_position,
data,
global_file_stuff,
nextdrop
};
stream = INPUT_STREAM (buf, 0);
(clean_tag, stream);
};
fun make_instream arg
=
{ (make_instream' arg)
->
( tag,
stream as INPUT_STREAM (INPUT_BUFFER { global_file_stuff, ... }, _)
);
eow::change_stream_startup_and_shutdown_actions
(
tag,
\\ () = close_in_global_file_stuff global_file_stuff
);
stream;
};
fun get_reader (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, global_file_stuff as GLOBAL_FILE_STUFF { filereader, ... }, nextdrop, ... };
#
fun get_data nextdrop
=
case (thk::get_from_maildrop nextdrop)
#
(NEXT (INPUT_BUFFER { data, nextdrop=>nextdrop', ... } ))
=>
data ! get_data nextdrop';
_ => [];
esac;
terminate global_file_stuff;
if (pos < cv::length data) (filereader, cv::cat (vec_extract (data, pos, NULL) ! get_data nextdrop));
else (filereader, cv::cat ( get_data nextdrop));
fi;
};
/*
# * Position operations on instreams *
enum in_pos = INP of {
base: pos,
offset: Int,
global_file_stuff: global_file_stuff
}
*/
/*
fun getPosIn (INPUT_STREAM (buf, pos)) = (case buf
of INPUT_BUFFER { basePos=NULL, global_file_stuff, ... } =>
inputExn (global_file_stuff, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
| INPUT_BUFFER { basePos=THE p, global_file_stuff, ... } => INP {
base = p, offset = pos, global_file_stuff = global_file_stuff
}
) # end case
*/
/*
fun filePosIn (INP { base, offset, ... } ) =
position.+(base, file_position::from_int offset)
*/
# Get the underlying file position of a stream:
#
fun file_position_in (INPUT_STREAM (buf, pos))
=
case buf
#
INPUT_BUFFER { file_position=>NULL, global_file_stuff, ... }
=>
raise_io_exception (global_file_stuff, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
INPUT_BUFFER { file_position=>THE base, global_file_stuff, ... }
=>
{ global_file_stuff -> GLOBAL_FILE_STUFF { filereader => drv::FILEREADER rd, read_vector, ... };
#
case (rd.get_file_position, rd.set_file_position)
#
(THE get_file_position, THE set_file_position)
=>
{ tmp_pos = get_file_position ();
#
fun read_n 0 => ();
read_n n => case (cv::length (read_vector n))
#
0 => raise_io_exception (global_file_stuff, "filePosIn", DIE "bogus position");
k => read_n (n-k);
esac;
end;
set_file_position base;
read_n pos;
get_file_position ()
then
set_file_position tmp_pos;
};
_ => raise exception DIE "filePosIn: impossible";
esac;
};
esac;
/*
fun setPosIn (pos as INP { global_file_stuff as GLOBAL_FILE_STUFF { reader, ... }, ... } ) = let
fpos = filePosIn pos
my (drv::FILEREADER rd) = reader
in
terminate global_file_stuff;
the rd.setPos fpos;
make_instream (drv::FILEREADER rd, NULL)
end
*/
# * Text stream specific operations *
#
fun read_line (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
=
{ my (data, stream)
=
if (cv::length data == pos) next_buf (TRUE, buf);
else scan_data (buf, pos);
fi;
result = cv::cat data;
if (cv::length result == 0) NULL;
else THE (result, stream);
fi;
}
where
fun join (item, (list, stream))
=
(item ! list, stream);
fun next_buf (is_empty, buf as INPUT_BUFFER { nextdrop, data, ... } )
=
get (thk::get_from_maildrop nextdrop)
where
fun last ()
=
(is_empty ?? [] :: ["\n"], INPUT_STREAM (buf, cv::length data));
fun get (NEXT buf)
=>
scan_data (buf, 0);
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT => case (extend_stream (read_vector buf, "read_line", buf))
#
DATA rest => scan_data (rest, 0);
EOF => last ();
esac;
other => { put_in_maildrop (nextdrop, other);
#
get other;
};
esac;
get TERMINATED
=>
last ();
end;
end
also
fun scan_data (buf as INPUT_BUFFER { data, ... }, i)
=
scan i
where
len = cv::length data;
fun scan j
=
if (j == len)
#
join (vec_extract (data, i, NULL), next_buf (FALSE, buf));
else
if (vec_get (data, j) == '\n')
#
([vec_extract (data, i, THE (j+1-i))], INPUT_STREAM (buf, j+1));
else
scan (j+1);
fi;
fi;
end;
end;
# IO mailop constructors:
# We exploit the "functional" nature of stream IO to implement the mailop
# constructors. These constructors spawn a thread to do the operation
# and and write the result in a Oneshot_Maildrop that serves as the
# synchronization value.
# NOTE: this implementation has the weakness that it prevents shutdown when
# everything else is deadlocked, since the thread that is spawned to actually
# do the I/O could proceed. XXX SUCKO FIXME
#
stipulate
Result(X) = RESULT(X)
| EXCEPTION Exception
;
include package threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg fun do_input input_op
=
{ fun read arg
=
RESULT (input_op arg)
except
ex = EXCEPTION ex;
\\ arg
=
thk::dynamic_mailop
{.
reply_1shot = make_oneshot_maildrop ();
make_thread "text I/O" {.
put_in_oneshot (reply_1shot, read arg);
};
get_from_oneshot' reply_1shot
==>
\\ (RESULT x ) => x;
(EXCEPTION ex) => raise exception ex;
end;
};
};
herein
input1evt = do_input read_one;
input_mailop = do_input read;
input_nevt = do_input read_n;
#
input_all_mailop = do_input read_all;
input_line_mailop = do_input read_line;
end;
# ** Output streams **
# An output stream is implemented as a monitor
# using a maildrop to hold its data.
Output_Stream_Info
=
OUTPUT_STREAM_INFO
{
buffer: wcv::Rw_Vector,
first_free_byte_in_buffer: Ref( Int ),
is_closed: Ref( Bool ),
buffering_mode: Ref( iox::Buffering_Mode ),
filewriter: Filewriter,
write_rw_vector: wcs::Slice -> Void,
write_vector: cvs::Slice -> Void,
clean_tag: eow::Tag
};
Output_Stream
=
Maildrop( Output_Stream_Info );
fun is_newline '\n' => TRUE;
is_newline _ => FALSE;
end;
fun is_line_break (OUTPUT_STREAM_INFO { buffering_mode, ... } )
=
*buffering_mode == iox::LINE_BUFFERING
?? is_newline
:: (\\ _ = FALSE);
fun output_exn (OUTPUT_STREAM_INFO { filewriter => drv::FILEWRITER { filename, ... }, ... }, ml_op, cause)
=
raise exception iox::IO { op => ml_op, name => filename, cause };
# Lock access to the stream and make sure that it is not closed.
fun lock_and_check_closed_out (strm_mv, ml_op)
=
case (take_from_maildrop strm_mv)
#
stream as OUTPUT_STREAM_INFO( { is_closed=>REF TRUE, ... } )
=>
{ put_in_maildrop (strm_mv, stream);
#
output_exn (stream, ml_op, iox::CLOSED_IO_STREAM);
};
stream => stream;
esac;
fun flush_buffer (strm_mv, stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
=
case *first_free_byte_in_buffer
#
0 => ();
n => { write_rw_vector (wcs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
}
except ex
=
{ put_in_maildrop (strm_mv, stream);
#
output_exn (stream, ml_op, ex);
};
esac;
# A version of copyVec that checks for newlines, while it is copying.
# This is used for LINE_BUFFERING output of strings and substrings.
#
fun line_buf_copy_vec (src, src_i, src_len, dst, dst_i)
=
cpy (src_i, dst_i, FALSE)
where
stop = src_i+src_len;
fun cpy (src_i, dst_i, lb)
=
if (src_i >= stop)
#
lb;
else
c = vec_get (src, src_i);
rw_vec_set (dst, dst_i, c);
cpy (src_i+1, dst_i+1, lb or is_newline c);
fi;
end;
# A version of copyVec for BLOCK_BUFFERING
# output of strings and substrings.
#
fun block_buf_copy_vec (from, from_i, from_len, into, at)
=
{ wcs::copy_vector
{
from => cvs::make_slice (from, from_i, THE from_len),
into,
at
};
FALSE;
};
fun write (strm_mv, v)
=
{ case *buffering_mode
#
iox::NO_BUFFERING => write_direct ();
iox::LINE_BUFFERING => insert line_buf_copy_vec;
iox::BLOCK_BUFFERING => insert block_buf_copy_vec;
esac;
release();
}
where
(lock_and_check_closed_out (strm_mv, "write"))
->
(stream as OUTPUT_STREAM_INFO os);
fun release ()
=
put_in_maildrop (strm_mv, stream);
os -> { buffer, first_free_byte_in_buffer, buffering_mode, ... };
fun flush ()
=
flush_buffer (strm_mv, stream, "write");
fun flush_all ()
=
os.write_rw_vector (wcs::make_full_slice buffer)
except ex
=
{ release();
#
output_exn (stream, "write", ex);
};
fun write_direct ()
=
{ case *first_free_byte_in_buffer
#
0 => ();
#
n => { os.write_rw_vector (wcs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
};
esac;
os.write_vector (cvs::make_full_slice v);
}
except ex
=
{ release ();
#
output_exn (stream, "write", ex);
};
fun insert copy_vector
=
{
buf_len = wcv::length buffer;
data_len = cv::length v;
if (data_len >= buf_len)
#
write_direct();
else
i = *first_free_byte_in_buffer;
#
avail = buf_len - i;
if (avail < data_len)
#
wcs::copy_vector
{ from => cvs::make_slice (v, 0, THE avail),
into => buffer,
at => i
};
flush_all();
needs_flush = copy_vector (v, avail, data_len-avail, buffer, 0);
first_free_byte_in_buffer := data_len-avail;
needs_flush ?: flush ();
else
needs_flush = copy_vector (v, 0, data_len, buffer, i);
first_free_byte_in_buffer := i + data_len;
if (needs_flush or avail == data_len)
flush ();
fi;
fi;
fi;
};
end;
fun write_one (strm_mv, element)
=
{ (lock_and_check_closed_out (strm_mv, "write_one"))
->
(stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
case *buffering_mode
#
iox::NO_BUFFERING
=>
{ rw_vec_set (buffer, 0, element);
#
write_rw_vector (wcs::make_slice (buffer, 0, THE 1))
except ex
=
{ release();
#
output_exn (stream, "write_one", ex);
};
};
iox::LINE_BUFFERING
=>
{ i = *first_free_byte_in_buffer;
#
i' = i+1;
rw_vec_set (buffer, i, element);
first_free_byte_in_buffer := i';
if (i' == wcv::length buffer or is_newline element)
#
flush_buffer (strm_mv, stream, "write_one");
fi;
};
iox::BLOCK_BUFFERING
=>
{ i = *first_free_byte_in_buffer;
i' = i+1;
rw_vec_set (buffer, i, element);
first_free_byte_in_buffer := i';
if (i' == wcv::length buffer)
flush_buffer (strm_mv, stream, "write_one");
fi;
};
esac;
release();
};
fun flush strm_mv
=
{ stream = lock_and_check_closed_out (strm_mv, "flush");
#
flush_buffer (strm_mv, stream, "flush");
put_in_maildrop (strm_mv, stream);
};
fun close_output strm_mv
=
{
(take_from_maildrop strm_mv)
->
(stream as OUTPUT_STREAM_INFO { filewriter => drv::FILEWRITER { close, ... }, is_closed, clean_tag, ... } );
if (not *is_closed)
#
flush_buffer (strm_mv, stream, "close");
is_closed := TRUE;
eow::drop_stream_startup_and_shutdown_actions clean_tag;
close ();
fi;
put_in_maildrop (strm_mv, stream);
};
fun make_outstream' (wr as drv::FILEWRITER { best_io_quantum, write_rw_vector, write_vector, ... }, mode)
=
{ fun iterate (f, size, subslice)
=
lp
where
fun lp sl
=
if (size sl != 0)
#
n = f sl;
#
lp (subslice (sl, n, NULL));
fi;
end;
write_rw_vector' = iterate (write_rw_vector, wcs::length, wcs::make_subslice);
write_vector' = iterate (write_vector, cvs::length, cvs::make_subslice);
# Install a dummy cleaner:
tag = eow::note_stream_startup_and_shutdown_actions dummy_cleaner;
stream = make_full_maildrop
(
OUTPUT_STREAM_INFO
{
first_free_byte_in_buffer => REF 0,
#
buffer => wcv::make_rw_vector (best_io_quantum, some_element),
#
is_closed => REF FALSE,
buffering_mode => REF mode,
#
filewriter => wr,
write_rw_vector => write_rw_vector',
write_vector => write_vector',
#
clean_tag => tag
}
);
(tag, stream);
};
fun make_outstream arg
=
{ (make_outstream' arg)
->
(tag, stream);
eow::change_stream_startup_and_shutdown_actions (tag, \\ () = close_output stream);
stream;
};
fun get_writer strm_mv
=
{ (lock_and_check_closed_out (strm_mv, "getWriter"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, buffering_mode, ... } );
(filewriter, *buffering_mode)
then
put_in_maildrop (strm_mv, stream);
};
# Position operations on outstreams
#
Out_Position
=
OUT_POSITION {
pos: drv::File_Position,
stream: Output_Stream
};
fun get_output_position strm_mv
=
{
(lock_and_check_closed_out (strm_mv, "getWriter"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
flush_buffer (strm_mv, stream, "get_output_position");
case filewriter
#
drv::FILEWRITER { get_file_position=>THE f, ... }
=>
OUT_POSITION { pos => f(), stream => strm_mv }
except ex
=
{ release();
#
output_exn (stream, "get_output_position", ex);
};
_ =>
{ release();
#
output_exn (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
}
then release();
esac;
};
fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
=
{ put_in_maildrop (strm_mv, lock_and_check_closed_out (strm_mv, "filePosOut"));
#
pos;
};
fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
=
{ (lock_and_check_closed_out (strm_mv, "set_output_position"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
case filewriter
#
drv::FILEWRITER { set_file_position => THE f, ... }
=>
(f pos)
except ex
=
{ release ();
#
output_exn (stream, "set_output_position", ex);
};
_ => { release();
#
output_exn (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
};
esac;
release();
};
fun set_buffering_mode (strm_mv, mode)
=
{ (lock_and_check_closed_out (strm_mv, "setBufferMode"))
->
(stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );
if (mode == iox::NO_BUFFERING)
#
flush_buffer (strm_mv, stream, "setBufferMode");
fi;
buffering_mode := mode;
put_in_maildrop (strm_mv, stream);
};
fun get_buffering_mode strm_mv
=
{
# * should we be checking for closed streams here??? * XXX QUERO FIXME
(lock_and_check_closed_out (strm_mv, "getBufferMode"))
->
(stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );
*buffering_mode
then
put_in_maildrop (strm_mv, stream);
};
# Text stream specific operations
#
fun write_substring (strm_mv, ss)
=
{ (lock_and_check_closed_out (strm_mv, "write_substring"))
->
(stream as OUTPUT_STREAM_INFO os);
fun release ()
=
put_in_maildrop (strm_mv, stream);
(burst_substring ss) -> (v, data_start, data_len);
os -> { buffer, first_free_byte_in_buffer, buffering_mode, ... };
buf_len = wcv::length buffer;
fun flush ()
=
flush_buffer (strm_mv, stream, "write_substring");
fun flush_all ()
=
(os.write_rw_vector (wcs::make_full_slice buffer)
except ex
=
{ release();
#
output_exn (stream, "write_substring", ex);
}
);
fun write_direct ()
=
{ case *first_free_byte_in_buffer
#
0=> ();
n=> { os.write_rw_vector (wcs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
};
esac;
os.write_vector (cvs::make_slice (v, data_start, THE data_len));
}
except ex
=
{ release ();
#
output_exn (stream, "write_substring", ex);
};
fun insert copy_vector
=
{ buf_len = wcv::length buffer;
data_len = cv::length v;
if (data_len >= buf_len)
#
write_direct ();
else
i = *first_free_byte_in_buffer;
avail = buf_len - i;
if (avail < data_len)
#
wcs::copy_vector
{ from => cvs::make_slice (v, data_start, THE avail),
into => buffer,
at => i
};
flush_all();
needs_flush = copy_vector (v, avail, data_len-avail, buffer, 0);
first_free_byte_in_buffer := data_len-avail;
needs_flush ?: flush ();
else
needs_flush
=
copy_vector (v, data_start, data_len, buffer, i);
first_free_byte_in_buffer := i + data_len;
if (needs_flush or avail == data_len) flush(); fi;
fi;
fi;
};
case *buffering_mode
#
iox::NO_BUFFERING => write_direct ();
iox::LINE_BUFFERING => insert line_buf_copy_vec;
iox::BLOCK_BUFFERING => insert block_buf_copy_vec;
esac;
release();
};
}; # pure_io
Vector = cv::Vector;
Element = cv::Element;
Input_Stream = Maildrop( pur::Input_Stream );
Output_Stream = Maildrop( pur::Output_Stream );
# Input operations
#
fun read stream
=
{ (pur::read (take_from_maildrop stream))
->
(v, stream');
put_in_maildrop (stream, stream');
v;
};
fun read_one stream
=
case (pur::read_one (take_from_maildrop stream))
#
THE (element, stream')
=>
{ put_in_maildrop (stream, stream');
#
THE element;
};
NULL => NULL;
esac;
fun read_n (stream, n)
=
{ (pur::read_n (take_from_maildrop stream, n))
->
(v, stream');
put_in_maildrop (stream, stream');
v;
};
fun read_all (stream: Input_Stream)
=
{ (pur::read_all (take_from_maildrop stream))
->
(v, stream');
put_in_maildrop (stream, stream');
v;
};
# Mailop-value constructors:
#
stipulate
Result(X)
= RESULT X
| EXCEPTION Exception
;
fun send_mailop (slot, v)
=
put_in_mailslot' (slot, RESULT v);
fun send_exn_mailop (slot, exn)
=
put_in_mailslot' (slot, EXCEPTION exn);
fun receive' slot
=
take_from_mailslot' slot
==>
\\ (RESULT v ) => v;
(EXCEPTION exn) => raise exception exn;
end;
fun do_input input_mailop (stream: Input_Stream) nack
=
{ reply_slot = make_mailslot ();
#
fun input_thread ()
=
{
stream' = take_from_maildrop stream;
#
nack_mailop
=
nack ==> (\\ _ = {
put_in_maildrop (stream, stream');
}
);
fun handle_input (result, stream'')
=
do_one_mailop [
#
send_mailop (reply_slot, result)
==>
(\\ _ = {
put_in_maildrop (stream, stream'');
}
),
nack_mailop
];
( do_one_mailop [
#
input_mailop stream' ==> handle_input,
nack_mailop
]
)
except exn
=
do_one_mailop [
#
send_exn_mailop (reply_slot, exn)
==>
(\\ _ = {
put_in_maildrop (stream, stream');
}
),
nack_mailop
];
};
make_thread "text I/O II" input_thread;
receive' reply_slot;
};
herein
fun input1evt (stream: Input_Stream)
=
dynamic_mailop_with_nack
#
(do_input input_mailop stream)
where
fun input_mailop (stream: pur::Input_Stream)
=
pur::input1evt stream
==>
\\ THE (s, stream') => (THE s, stream');
NULL => (NULL, stream);
end;
end;
fun input_mailop stream
=
dynamic_mailop_with_nack
#
(do_input pur::input_mailop stream);
fun input_nevt (stream, n)
=
dynamic_mailop_with_nack
#
(do_input
(\\ stream' = pur::input_nevt (stream', n))
stream
);
fun input_all_mailop stream
=
dynamic_mailop_with_nack
#
(do_input pur::input_all_mailop stream);
end; # stipulate
fun peek (stream: Input_Stream)
=
case (pur::read_one (thk::get_from_maildrop stream))
#
THE (element, _) => THE element;
NULL => NULL;
esac;
fun close_input stream
=
{
(take_from_maildrop stream)
->
(s as pur::INPUT_STREAM (buffer as pur::INPUT_BUFFER { data, ... }, _));
pur::close_input s;
put_in_maildrop (stream, pur::find_eos buffer);
};
fun end_of_stream stream
=
pur::end_of_stream (thk::get_from_maildrop stream);
/*
fun getPosIn stream = pur::getPosIn (md::mGet stream)
fun setPosIn (stream, p) = mUpdate (stream, pur::setPosIn p)
*/
# Output operations:
#
fun write (stream, v) = pur::write (thk::get_from_maildrop stream, v);
fun write_one (stream, c) = pur::write_one (thk::get_from_maildrop stream, c);
fun flush stream = pur::flush (thk::get_from_maildrop stream);
fun close_output stream = pur::close_output (thk::get_from_maildrop stream);
fun get_output_position stream
=
pur::get_output_position (thk::get_from_maildrop stream);
fun set_output_position (stream, p as pur::OUT_POSITION { stream=>stream', ... } )
=
{ m_update (stream, stream');
#
pur::set_output_position p;
};
fun make_instream (stream: pur::Input_Stream ) = make_full_maildrop stream;
fun get_instream (stream: Input_Stream ) = thk::get_from_maildrop stream;
fun set_instream (stream: Input_Stream , stream') = m_update (stream, stream');
fun make_outstream (stream: pur::Output_Stream) = make_full_maildrop stream;
fun get_outstream (stream: Output_Stream) = thk::get_from_maildrop stream;
fun set_outstream (stream: Output_Stream, stream') = m_update (stream, stream');
# Figure out the proper buffering mode for a given filewriter
#
fun buffering (drv::FILEWRITER { io_descriptor => NULL, ... } )
=>
iox::BLOCK_BUFFERING;
buffering (drv::FILEWRITER { io_descriptor => THE iod, ... } )
=>
winix__premicrothread::io::iod_to_iodkind iod == wty::CHAR_DEVICE
##
?? iox::LINE_BUFFERING
:: iox::BLOCK_BUFFERING;
end;
# Open files:
#
fun open_for_read fname
=
make_instream (pur::make_instream (wxd::open_for_read fname, empty_string))
except
ex = raise exception iox::IO { op=>"open_for_read", name=>fname, cause=>ex };
fun open_for_write fname
=
{ wr = wxd::open_for_write fname;
#
make_outstream (pur::make_outstream (wr, buffering wr))
except
ex = raise exception iox::IO { op=>"open", name=>fname, cause=>ex };
};
fun open_for_append fname
=
make_outstream (pur::make_outstream (wxd::open_for_append fname, iox::NO_BUFFERING))
except ex
=
raise exception iox::IO { op=>"open_for_append", name=>fname, cause=>ex };
# Text stream specific operations:
#
fun read_line stream
=
null_or::map
(\\ (s, stream') = { put_in_maildrop (stream, stream');
s;
}
)
(pur::read_line (take_from_maildrop stream));
fun write_substring (stream, ss)
=
pur::write_substring (thk::get_from_maildrop stream, ss);
fun open_string src
=
make_instream (pur::make_instream (wxd::string_reader src, empty_string))
except
cause = raise exception iox::IO { op => "open_for_read", name => "<string>", cause };
#
fun read_lines input_stream
=
read_lines' (input_stream, [])
where
fun read_lines' (s, lines_so_far)
=
case (read_line s)
#
NULL => reverse lines_so_far;
THE line => read_lines' (s, line ! lines_so_far);
esac;
end;
#
fun as_lines filename
=
{
fd = open_for_read filename;
result = read_lines fd;
# close_input fd; # 2015-06-15 CrT: This was blocking forever on first lines of close_input.
# Changed extend_stream to call close_in_global_file_stuff before returning EOF, which I think makes it safe to comment out this close_input call. Our file-input libraries are WAAY too much code for too little benefit. :-( XXX SUCKO FIXME.
result;
};
#
fun from_lines filename lines
=
{ fd = open_for_write filename;
#
map {. write (fd, #line); } lines;
flush fd;
close_output fd;
};
package mailslot_io
=
winix_mailslot_io_g ( # winix_mailslot_io_g is from
src/lib/std/src/io/winix-mailslot-io-g.pkg #
package drv = drv;
package rv = vector_of_chars; # vector_of_chars is from
src/lib/std/vector-of-chars.pkg package wv = rw_vector_of_chars; # rw_vector_of_chars is from
src/lib/std/rw-vector-of-chars.pkg package rvs = vector_slice_of_chars; # vector_slice_of_chars is from
src/lib/std/src/vector-slice-of-chars.pkg package wvs = rw_vector_slice_of_chars; # rw_vector_slice_of_chars is from
src/lib/std/src/rw-vector-slice-of-chars.pkg );
# Open an Input_Stream that is connected
# to the output port of a channel.
#
fun open_slot_in slot
=
make_instream (pur::make_instream (mailslot_io::make_filereader slot, empty_string));
# Open an Output_Stream that is connected
# to the input port of a slot.
#
fun open_slot_out ch
=
make_outstream (pur::make_outstream (mailslot_io::make_filewriter ch, iox::NO_BUFFERING));
# * Standard streams *
#
stipulate
#
fun make_std_in rebind
=
{ (pur::make_instream' (wxd::stdin(), empty_string))
->
(tag, stream);
if rebind eow::change_stream_startup_and_shutdown_actions (tag, dummy_cleaner); fi;
stream;
};
fun make_std_out rebind
=
{
wr = wxd::stdout();
(pur::make_outstream' (wr, buffering wr))
->
(tag, stream);
if rebind eow::change_stream_startup_and_shutdown_actions (tag, \\ () = pur::flush stream); fi;
stream;
};
fun make_std_err rebind
=
{ (pur::make_outstream' (wxd::stderr (), iox::NO_BUFFERING))
->
(tag, stream);
if rebind eow::change_stream_startup_and_shutdown_actions (tag, \\ () = pur::flush stream); fi;
stream;
};
herein
# Build the standard streams.
#
# Since we are not currently running threadkit,
# we cannot do the cleaner renaming here,
# but that is okay,
# since these are just place holders.
#
stdin = make_instream (make_std_in FALSE);
stdout = make_outstream (make_std_out FALSE);
stderr = make_outstream (make_std_err FALSE);
fun print s
=
{ stream' = take_from_maildrop stdout;
#
pur::write (stream', s);
pur::flush stream';
put_in_maildrop (stdout, stream');
};
fun scan_stream scan_g
=
do_it
where
scan = scan_g pur::read_one;
fun do_it stream
=
{ instream = get_instream stream;
#
case (scan instream)
#
THE (item, instream')
=>
{ set_instream (stream, instream');
THE item;
};
NULL => NULL;
esac;
};
end;
# Establish a hook function to rebuild the I/O stack
#
my _ =
eow::std_stream_hook
:=
(\\ () = { set_instream (stdin, make_std_in TRUE);
set_outstream (stdout, make_std_out TRUE);
set_outstream (stderr, make_std_err TRUE);
#
ri::print_hook := print;
}
);
end; # stipulate
end; # stipulate
fun exists filename
=
psx::stat::is_file
(psx::stat filename)
except
_ = FALSE;
}; # winix_text_file_for_os_g
end;
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.