## winix-data-file-for-os-g.pkg
#
# Generic package which combines platform-specific code wxd
# with platform-agnostic body code to produce a complete
# platform-specific binary-file package.
#
# This is the binary-file equivalent to
#
#
src/lib/std/src/io/winix-text-file-for-os-g.pkg#
# It is the multithread alternative to
#
#
src/lib/std/src/io/winix-data-file-for-os-g--premicrothread.pkg#
# This is the threadkit version of the binary_file generic package
#
#
src/lib/std/src/io/winix-data-file-for-os-g--premicrothread.pkg# Compiled by:
#
src/lib/std/standard.libstipulate
package dio = winix_base_data_file_io_driver_for_posix; # winix_base_data_file_io_driver_for_posix is from
src/lib/std/src/io/winix-base-data-file-io-driver-for-posix.pkg package thk = threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg #
package iox = io_exceptions; # io_exceptions is from
src/lib/std/src/io/io-exceptions.pkg package eow = io_startup_and_shutdown; # "eow" == "end of world" # io_startup_and_shutdown is from
src/lib/std/src/io/io-startup-and-shutdown.pkg #
package wv = rw_vector_of_one_byte_unts; # rw_vector_of_one_byte_unts is from
src/lib/std/src/rw-vector-of-one-byte-unts.pkg package wvs = rw_vector_slice_of_one_byte_unts; # rw_vector_slice_of_one_byte_unts is from
src/lib/std/src/rw-vector-slice-of-one-byte-unts.pkg #
package rv = vector_of_one_byte_unts; # vector_of_one_byte_unts is from
src/lib/std/src/vector-of-one-byte-unts.pkg package rvs = vector_slice_of_one_byte_unts; # vector_slice_of_one_byte_unts is from
src/lib/std/src/vector-slice-of-one-byte-unts.pkg #
package pos = file_position; # file_position is from
src/lib/std/file-position.pkgherein
# This generic is invoked (only) in:
#
#
src/lib/std/src/posix/data-file.pkg #
generic package winix_data_file_for_os_g (
# ========================
# # Winix_Extended_File_Io_Driver_For_Os is from
src/lib/std/src/io/winix-extended-file-io-driver-for-os.api package wxd # "wxd" == "WiniX file io Driver"
:
Winix_Extended_File_Io_Driver_For_Os
where drv::Rw_Vector == dio::Rw_Vector
where drv::Vector == dio::Vector
where drv::Rw_Vector_Slice == dio::Rw_Vector_Slice
where drv::Vector_Slice == dio::Vector_Slice
where drv::Element == dio::Element
where drv::File_Position == dio::File_Position
where drv::Filereader == dio::Filereader
where drv::Filewriter == dio::Filewriter;
)
: (weak) Winix_Data_File_For_Os # Winix_Data_File_For_Os is from
src/lib/std/src/io/winix-data-file-for-os.api {
include package threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg #
package drv = wxd::drv; # wxd is our argument.
# Assign to a maildrop:
#
fun update_maildrop (mv, x)
=
{
take_from_maildrop mv;
#
put_in_maildrop (mv, x);
};
some_element = (0u0: one_byte_unt::Unt); # An element for initializing buffers:
vec_extract = rvs::to_vector o rvs::make_slice; # "vec" should be renamed to "vector" throughout here. XXX SUCKO FIXME.
vec_get = rv::get;
rw_vec_set = wv::set;
empty_vec = rv::from_list [];
fun dummy_cleaner () = ();
package pur { # "pur" is short for "pure" (I/O).
#
Vector = rv::Vector;
Element = rv::Element;
Filereader = drv::Filereader;
Filewriter = drv::Filewriter;
File_Position = drv::File_Position;
# ** Functional input streams **
#
Input_Stream
=
INPUT_STREAM (Input_Buffer, Int)
also
Input_Buffer
=
INPUT_BUFFER
{
data: Vector,
file_position: Null_Or( File_Position ),
nextdrop: Maildrop( Next ), # When this cell is empty, it means that
# there is an outstanding request to the
# server to extend the stream.
global_file_stuff: Global_File_Stuff
}
also
Next
= NEXT Input_Buffer # Forward link to additional data.
| NO_NEXT
# Placeholder for forward link.
| TERMINATED
# Termination of the stream.
also
Global_File_Stuff
=
GLOBAL_FILE_STUFF
{
filereader: Filereader,
read_vector: Int -> Vector,
read_vector_mailop: Int -> thk::Mailop( Vector ),
is_closed: Ref( Bool ),
get_file_position: Void -> Null_Or( File_Position ),
last_nextref: Maildrop( Maildrop( Next ) ), # Points to the next cell of the last buffer.
clean_tag: eow::Tag
};
fun global_file_stuff_of_ibuf (INPUT_BUFFER { global_file_stuff, ... } )
=
global_file_stuff;
fun best_io_quantum_of_ibuf buf
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { best_io_quantum, ... }, ... };
best_io_quantum;
};
fun read_vector (INPUT_BUFFER { global_file_stuff => GLOBAL_FILE_STUFF { read_vector => f, ... }, ... } )
=
f;
fun raise_io_exception (GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { filename, ... }, ... }, ml_op, cause)
=
raise exception iox::IO { op => ml_op, name => filename, cause };
Next_Data = EOF
| DATA Input_Buffer
;
# Extend the stream by a chunk.
# Invariant: the next m-variable is empty on entry and full on exit.
#
fun extend_stream (read_fn, ml_op, buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
=
{ global_file_stuff -> GLOBAL_FILE_STUFF { get_file_position, last_nextref, ... };
#
file_position = get_file_position ();
chunk = read_fn (best_io_quantum_of_ibuf buf);
if (rv::length chunk == 0)
#
put_in_maildrop (nextdrop, NO_NEXT);
EOF;
else
new_next = make_empty_maildrop ();
buf' = INPUT_BUFFER {
file_position,
data => chunk,
nextdrop => new_next,
global_file_stuff
};
# Note that we do not fill the new_next cell until
# after the last_nextref has been updated. This ensures
# that someone attempting to access the last_nextref will
# not acquire the lock until after we are done.
#
update_maildrop (last_nextref, new_next);
put_in_maildrop (nextdrop, NEXT buf'); # releases lock!!
put_in_maildrop (new_next, NO_NEXT);
DATA buf';
fi;
}
except
ex = {
put_in_maildrop (nextdrop, NO_NEXT);
#
raise_io_exception (global_file_stuff, ml_op, ex);
};
# Get the next buffer in the stream,
# extending it if necessary.
# If the stream must be extended,
# we lock it by taking the value from the
# next cell; the extend_stream function
# is responsible for filling in the cell.
#
fun get_next_buffer (read_fn, ml_op) (buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
=
get (thk::get_from_maildrop nextdrop)
where
fun get TERMINATED => EOF;
get (NEXT buf') => DATA buf';
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT => extend_stream (read_fn, ml_op, buf);
other => {
put_in_maildrop (nextdrop, other);
get other;
};
esac;
end;
end;
# Read a chunk that is at least the specified size
#
fun read_chunk buf
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { read_vector, filereader => drv::FILEREADER { best_io_quantum, ... }, ... };
case (best_io_quantum - 1)
#
0 => (\\ n = read_vector n);
#
k => (\\ n = read_vector (int::quot (n+k, best_io_quantum) * best_io_quantum));
#
# Round up to next multiple of best_io_quantum
esac;
};
fun generalized_input get_buf
=
get
where
fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
=
{ len = rv::length data;
#
if (pos < len)
#
(vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
else
case (get_buf buf)
#
EOF => (empty_vec, INPUT_STREAM (buf, len));
DATA rest => get (INPUT_STREAM (rest, 0));
esac;
fi;
};
end;
# Terminate an input stream
#
fun terminate (global_file_stuff as GLOBAL_FILE_STUFF { last_nextref, clean_tag, ... } )
=
{ m = thk::get_from_maildrop last_nextref;
#
case (take_from_maildrop m)
#
(m' as NEXT _)
=>
{
put_in_maildrop (m, m');
terminate global_file_stuff;
};
TERMINATED
=>
put_in_maildrop (m, TERMINATED);
_ => { eow::drop_stream_startup_and_shutdown_actions clean_tag;
put_in_maildrop (m, TERMINATED);
};
esac;
};
# Find the end of the stream
#
fun find_eos (buf as INPUT_BUFFER { nextdrop, data, ... } )
=
case (thk::get_from_maildrop nextdrop)
#
NEXT buf => find_eos buf;
_ => INPUT_STREAM (buf, rv::length data);
esac;
fun read (stream as INPUT_STREAM (buf, _))
=
generalized_input
(get_next_buffer (read_vector buf, "read"))
stream;
fun read_one (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, nextdrop, ... };
if (pos < rv::length data)
#
THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
else
get (thk::get_from_maildrop nextdrop)
where
fun get (NEXT buf) => read_one (INPUT_STREAM (buf, 0));
get TERMINATED => NULL;
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT
=>
case (extend_stream (read_vector buf, "read_one", buf))
DATA rest => read_one (INPUT_STREAM (rest, 0));
EOF => NULL;
esac;
other =>
{
put_in_maildrop (nextdrop, other);
get other;
};
esac;
end;
end;
fi;
};
fun read_n (INPUT_STREAM (buf, pos), n)
=
{ fun join (item, (list, stream))
=
(item ! list, stream);
fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
=
{ len = rv::length data;
#
remain = len-i;
if (remain >= n)
([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
else
join ( vec_extract (data, i, NULL),
next_buf (buf, n-remain)
);
fi;
}
also
fun next_buf (buf as INPUT_BUFFER { nextdrop, data, ... }, n)
=
get (thk::get_from_maildrop nextdrop)
where
fun get (NEXT buf)
=>
input_list (buf, 0, n);
get TERMINATED
=>
([], INPUT_STREAM (buf, rv::length data));
get NO_NEXT
=>
case (take_from_maildrop nextdrop)
#
NO_NEXT =>
case (extend_stream (read_vector buf, "read_n", buf))
#
EOF => ([], INPUT_STREAM (buf, rv::length data));
(DATA rest) => input_list (rest, 0, n);
esac;
other=> {
put_in_maildrop (nextdrop, other);
get other;
};
esac;
end;
end;
(input_list (buf, pos, n))
->
(data, stream);
(rv::cat data, stream);
};
fun read_all (stream as INPUT_STREAM (buf, _))
=
{ (global_file_stuff_of_ibuf buf)
->
GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { avail, ... }, ... };
# Read a chunk that is as large as the available input.
# Note that for systems that use CR-LF for '\n', the
# size will be too large, but this should be okay.
#
fun big_chunk _
=
read_chunk buf delta
where
delta = case (avail())
#
NULL => best_io_quantum_of_ibuf buf;
THE n => n;
esac;
end;
big_input
=
generalized_input
(get_next_buffer (big_chunk, "read_all"));
fun loop (v, stream)
=
if (rv::length v == 0) [];
else v ! loop (big_input stream);
fi;
data = rv::cat (loop (big_input stream));
(data, find_eos buf);
};
fun input1evt _ = raise exception DIE "input1Evt unimplemented";
fun input_mailop _ = raise exception DIE "inputEvt unimplemented";
fun input_nevt _ = raise exception DIE "inputNEvt unimplemented";
fun input_all_mailop _ = raise exception DIE "inputAllEvt unimplemented";
# Close an input stream given its global_file_stuff record.
# We need this function for the cleanup hook
# to avoid a space leak.
#
fun close_in_global_file_stuff (GLOBAL_FILE_STUFF { is_closed=>REF TRUE, ... } )
=>
();
close_in_global_file_stuff (global_file_stuff as GLOBAL_FILE_STUFF { is_closed, filereader => drv::FILEREADER { close, ... }, ... } )
=>
{
# ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
terminate global_file_stuff;
is_closed := TRUE;
close()
except
ex = raise_io_exception (global_file_stuff, "close_input", ex);
};
end;
fun close_input (INPUT_STREAM (buf, _))
=
close_in_global_file_stuff (global_file_stuff_of_ibuf buf);
fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { nextdrop, ... }, pos))
=
case (take_from_maildrop nextdrop)
#
other as NEXT _
=>
{
put_in_maildrop (nextdrop, other);
#
FALSE;
};
other
=>
{ buf -> INPUT_BUFFER { data, global_file_stuff=>GLOBAL_FILE_STUFF { is_closed, ... }, ... };
#
if (pos == rv::length data)
#
case (other, *is_closed)
#
(NO_NEXT, FALSE)
=>
case (extend_stream (read_vector buf, "end_of_stream", buf))
#
EOF => TRUE;
_ => FALSE;
esac;
_ =>
{
put_in_maildrop (nextdrop, other);
TRUE;
};
esac;
else
put_in_maildrop (nextdrop, other);
FALSE;
fi;
};
esac;
fun make_instream (filereader, data)
=
{ filereader -> drv::FILEREADER { read_vector, read_vector_mailop, get_file_position, set_file_position, ... };
#
get_file_position
=
case (get_file_position, set_file_position)
#
(THE f, THE _)
=>
(\\ () = THE (f ()));
_ =>
(\\ () = NULL);
esac;
nextdrop = make_full_maildrop NO_NEXT;
tag = eow::note_stream_startup_and_shutdown_actions dummy_cleaner;
global_file_stuff
=
GLOBAL_FILE_STUFF
{
filereader,
read_vector,
read_vector_mailop,
get_file_position,
#
is_closed => REF FALSE,
last_nextref => make_full_maildrop nextdrop,
clean_tag => tag
};
# * What should we do about the position in this case ?? *
# Suggestion: When building a stream with supplied initial data,
# nothing can be said about the positions inside that initial
# data (who knows where that data even came from!).
file_position
=
if (rv::length data == 0) get_file_position ();
else NULL;
fi;
buf = INPUT_BUFFER {
file_position, data,
global_file_stuff, nextdrop
};
stream = INPUT_STREAM (buf, 0);
eow::change_stream_startup_and_shutdown_actions
( tag,
\\ () = close_in_global_file_stuff global_file_stuff
);
stream;
};
fun get_reader (INPUT_STREAM (buf, pos))
=
{ buf -> INPUT_BUFFER { data, global_file_stuff as GLOBAL_FILE_STUFF { filereader, ... }, nextdrop, ... };
#
fun get_data nextdrop
=
case (thk::get_from_maildrop nextdrop)
#
NEXT (INPUT_BUFFER { data, nextdrop=>nextdrop', ... } )
=>
data ! get_data nextdrop';
_ => [];
esac;
terminate global_file_stuff;
if (pos < rv::length data) (filereader, rv::cat (vec_extract (data, pos, NULL) ! get_data nextdrop));
else (filereader, rv::cat ( get_data nextdrop));
fi;
};
/*
# * Position operations on instreams *
enum in_pos = INP of {
base: pos,
offset: Int,
global_file_stuff: global_file_stuff
}
*/
/*
fun getPosIn (INPUT_STREAM (buf, pos)) = case buf
of INPUT_BUFFER { basePos=NULL, global_file_stuff, ... } =>
inputExn (global_file_stuff, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
| INPUT_BUFFER { basePos=THE p, global_file_stuff, ... } => INP {
base = p, offset = pos, global_file_stuff = global_file_stuff
}
esac
*/
/*
fun filePosIn (INP { base, offset, ... } ) =
position.+(base, file_position::from_int offset)
*/
fun file_position_in (INPUT_STREAM (buf, pos))
=
case buf
#
INPUT_BUFFER { file_position=>NULL, global_file_stuff, ... }
=>
raise_io_exception (global_file_stuff, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
INPUT_BUFFER { file_position=>THE b, ... }
=>
file_position::(+) (b, file_position::from_int pos);
esac;
/*
fun setPosIn (pos as INP { global_file_stuff as GLOBAL_FILE_STUFF { reader, ... }, ... } ) = let
fpos = filePosIn pos
my (drv::FILEREADER rd) = reader
in
terminate global_file_stuff;
the rd.setPos fpos;
make_instream (drv::FILEREADER rd, empty_vec)
end
*/
# IO mailop constructors:
# We exploit the "functional" nature of stream IO
# to implement the mailop constructors. These
# constructors spawn a thread to do the operation
# and and write the result in an iVariable that
# serves as the synchronization value.
#
# NOTE: This implementation has the weakness that
# it prevents shutdown when everything else is
# deadlocked, since the thread that is spawned
# to actually do the IO could proceed.
stipulate
Result X = RES X
| EXCEPTION Exception;
fun do_input input_op
=
{ fun read arg
=
RES (input_op arg)
except
ex = EXCEPTION ex;
\\ arg
=
thk::dynamic_mailop
{.
reply_1shot = make_oneshot_maildrop ();
make_thread "binary I/O" {.
put_in_oneshot (reply_1shot, read arg);
};
get_from_oneshot' reply_1shot
==>
\\ (RES x) => x;
(EXCEPTION ex) => raise exception ex;
end;
};
};
herein
input1evt = do_input read_one;
input_mailop = do_input read;
input_nevt = do_input read_n;
input_all_mailop = do_input read_all;
end; # stipulate
# ** Output streams **
# An output stream is implemented
# as a monitor using an mvar to
# hold its data.
Output_Stream_Info
=
OUTPUT_STREAM_INFO
{
buffer: wv::Rw_Vector,
first_free_byte_in_buffer: Ref( Int ),
is_closed: Ref( Bool ),
buffering_mode: Ref( iox::Buffering_Mode ),
filewriter: Filewriter,
write_rw_vector: wvs::Slice -> Void,
write_vector: rvs::Slice -> Void,
clean_tag: eow::Tag
};
Output_Stream
=
Maildrop( Output_Stream_Info );
fun raise_io_exception (OUTPUT_STREAM_INFO { filewriter => drv::FILEWRITER { filename, ... }, ... }, ml_op, cause)
=
raise exception iox::IO { op => ml_op, name => filename, cause };
# Lock access to the stream and
# make sure that it is not closed.
#
fun lock_and_check_closed_out (strm_mv, ml_op)
=
case (take_from_maildrop strm_mv)
#
(stream as OUTPUT_STREAM_INFO( { is_closed => REF TRUE, ... } ))
=>
{
put_in_maildrop (strm_mv, stream);
#
raise_io_exception (stream, ml_op, iox::CLOSED_IO_STREAM);
};
stream => stream;
esac;
fun flush_buffer (strm_mv, stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
=
case *first_free_byte_in_buffer
#
0 => ();
n => { write_rw_vector (wvs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
}
except
ex = {
put_in_maildrop (strm_mv, stream);
#
raise_io_exception (stream, ml_op, ex);
};
esac;
fun write (strm_mv, v)
=
{ (lock_and_check_closed_out (strm_mv, "write"))
->
(stream as OUTPUT_STREAM_INFO os);
fun release ()
=
put_in_maildrop (strm_mv, stream);
os -> { buffer, first_free_byte_in_buffer, buffering_mode, ... };
fun flush ()
=
flush_buffer (strm_mv, stream, "write");
fun flush_all ()
=
os.write_rw_vector (wvs::make_full_slice buffer)
except
ex = { release();
#
raise_io_exception (stream, "write", ex);
};
fun write_direct ()
=
{ case *first_free_byte_in_buffer
#
0 => ();
#
n => { os.write_rw_vector (wvs::make_slice (buffer, 0, THE n));
#
first_free_byte_in_buffer := 0;
};
esac;
os.write_vector (rvs::make_full_slice v);
}
except
ex = { release();
#
raise_io_exception (stream, "write", ex);
};
fun insert copy_vec
=
{ buf_len = wv::length buffer;
data_len = rv::length v;
if (data_len >= buf_len)
#
write_direct ();
else
i = *first_free_byte_in_buffer;
avail = buf_len - i;
if (avail < data_len)
#
copy_vec (v, 0, avail, buffer, i);
flush_all ();
copy_vec (v, avail, data_len-avail, buffer, 0);
first_free_byte_in_buffer := data_len-avail;
else
copy_vec (v, 0, data_len, buffer, i);
first_free_byte_in_buffer := i + data_len;
if (avail == data_len) flush (); fi;
fi;
fi;
};
case *buffering_mode
#
iox::NO_BUFFERING
=>
write_direct ();
_ =>
insert copy_vec
where
fun copy_vec (from, from_i, from_len, into, at)
=
wvs::copy_vector { from => rvs::make_slice (from, from_i, THE from_len),
into,
at
};
end;
esac;
release ();
};
fun write_one (strm_mv, element)
=
release ()
where
(lock_and_check_closed_out (strm_mv, "write_one"))
->
(stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
case *buffering_mode
#
iox::NO_BUFFERING
=>
{ rw_vec_set (buffer, 0, element);
#
write_rw_vector (wvs::make_slice (buffer, 0, THE 1))
except
ex = { release();
raise_io_exception (stream, "write_one", ex);
};
};
_ =>
{ i = *first_free_byte_in_buffer;
#
i' = i+1;
rw_vec_set (buffer, i, element);
first_free_byte_in_buffer := i';
if (i' == wv::length buffer)
#
flush_buffer (strm_mv, stream, "write_one");
fi;
};
esac;
end;
fun flush strm_mv
=
{ stream = lock_and_check_closed_out (strm_mv, "flush");
#
flush_buffer (strm_mv, stream, "flush");
#
put_in_maildrop (strm_mv, stream);
};
fun close_output strm_mv
=
{
(take_from_maildrop strm_mv)
->
(stream as OUTPUT_STREAM_INFO { filewriter => drv::FILEWRITER { close, ... }, is_closed, clean_tag, ... } );
if (not *is_closed)
#
flush_buffer (strm_mv, stream, "close");
is_closed := TRUE;
eow::drop_stream_startup_and_shutdown_actions clean_tag;
close();
fi;
put_in_maildrop (strm_mv, stream);
};
fun make_outstream (wr as drv::FILEWRITER { best_io_quantum, write_rw_vector, write_vector, ... }, mode)
=
stream
where
fun iterate (f, size, subslice)
=
lp
where
fun lp sl
=
if (size sl != 0)
#
n = f sl;
lp (subslice (sl, n, NULL));
fi;
end;
write_rw_vector' = iterate (write_rw_vector, wvs::length, wvs::make_subslice);
write_vector' = iterate (write_vector, rvs::length, rvs::make_subslice);
# Install a dummy cleaner:
#
tag = eow::note_stream_startup_and_shutdown_actions dummy_cleaner;
stream = make_full_maildrop (
#
OUTPUT_STREAM_INFO
{
buffer => wv::make_rw_vector (best_io_quantum, some_element),
#
first_free_byte_in_buffer => REF 0,
is_closed => REF FALSE,
buffering_mode => REF mode,
filewriter => wr,
write_rw_vector => write_rw_vector',
write_vector => write_vector',
clean_tag => tag
}
);
eow::change_stream_startup_and_shutdown_actions (tag, \\ () = close_output stream);
end;
fun get_writer strm_mv
=
{ (lock_and_check_closed_out (strm_mv, "get_writer"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, buffering_mode, ... } );
(filewriter, *buffering_mode)
then
put_in_maildrop (strm_mv, stream);
};
# Position operations on outstreams:
#
Out_Position
=
OUT_POSITION
{
pos: drv::File_Position,
stream: Output_Stream
};
fun get_output_position strm_mv
=
{ (lock_and_check_closed_out (strm_mv, "getWriter"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
flush_buffer (strm_mv, stream, "get_output_position");
case filewriter
#
drv::FILEWRITER { get_file_position => THE f, ... }
=>
OUT_POSITION { pos => f(), stream => strm_mv }
except
ex = { release();
raise_io_exception (stream, "get_output_position", ex);
};
_ => { release();
raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
}
then
release ();
esac;
};
fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
=
{
put_in_maildrop (strm_mv, lock_and_check_closed_out (strm_mv, "file_pos_out"));
#
pos;
};
fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
=
{ (lock_and_check_closed_out (strm_mv, "set_output_position"))
->
(stream as OUTPUT_STREAM_INFO { filewriter, ... } );
fun release ()
=
put_in_maildrop (strm_mv, stream);
case filewriter
#
drv::FILEWRITER { set_file_position=>THE f, ... }
=>
f pos
except
ex = { release ();
#
raise_io_exception (stream, "set_output_position", ex);
};
_ =>
{ release ();
#
raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
};
esac;
release ();
};
fun set_buffering_mode (strm_mv, mode)
=
{ (lock_and_check_closed_out (strm_mv, "setBufferMode"))
->
(stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );
if (mode == iox::NO_BUFFERING)
#
flush_buffer (strm_mv, stream, "setBufferMode");
fi;
buffering_mode := mode;
put_in_maildrop (strm_mv, stream);
};
fun get_buffering_mode strm_mv
=
{ # Should we be checking for closed streams here??? XXX BUGGO FIXME
#
(lock_and_check_closed_out (strm_mv, "getBufferMode"))
->
(stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );
*buffering_mode
then
put_in_maildrop (strm_mv, stream);
};
}; # package pure_io
Vector = rv::Vector;
Element = rv::Element;
Input_Stream = Maildrop( pur::Input_Stream );
Output_Stream = Maildrop( pur::Output_Stream );
# Input operations:
fun read stream
=
{
(pur::read (take_from_maildrop stream))
->
(v, stream');
put_in_maildrop (stream, stream');
v;
};
fun read_one stream
=
case (pur::read_one (take_from_maildrop stream))
#
THE (element, stream')
=>
{
put_in_maildrop (stream, stream');
#
THE element;
};
NULL => NULL;
esac;
fun read_n (stream, n)
=
{
(pur::read_n (take_from_maildrop stream, n))
->
(v, stream');
put_in_maildrop (stream, stream');
v;
};
fun read_all (stream: Input_Stream)
=
{
(pur::read_all (take_from_maildrop stream))
->
(v, stream');
put_in_maildrop (stream, stream'); v;
};
fun input1evt _ = raise exception DIE "input1evt unimplemented";
fun input_mailop _ = raise exception DIE "input_mailop unimplemented";
fun input_nevt _ = raise exception DIE "input_nevt unimplemented";
fun input_all_mailop _ = raise exception DIE "input_ell_mailop unimplemented";
fun peek (stream: Input_Stream)
=
case (pur::read_one (thk::get_from_maildrop stream)) # We do a pure read_one and then discard the new stream. Since pure reads have no side effects, this is just the ticket.
#
THE (element, _) => THE element;
#
NULL => NULL;
esac;
fun close_input stream
=
{
(take_from_maildrop stream)
->
(s as pur::INPUT_STREAM (buf as pur::INPUT_BUFFER { data, ... }, _));
pur::close_input s;
put_in_maildrop (stream, pur::find_eos buf);
};
fun end_of_stream stream
=
pur::end_of_stream (thk::get_from_maildrop stream);
/*
fun getPosIn stream = pur::getPosIn (mGet stream)
fun setPosIn (stream, p) = mUpdate (stream, pur::setPosIn p)
*/
# Output operations:
fun write (stream, v) = pur::write (thk::get_from_maildrop stream, v);
fun write_one (stream, c) = pur::write_one (thk::get_from_maildrop stream, c);
fun flush stream = pur::flush (thk::get_from_maildrop stream);
fun close_output stream = pur::close_output (thk::get_from_maildrop stream);
fun get_output_position stream = pur::get_output_position (thk::get_from_maildrop stream);
fun set_output_position (stream, p as pur::OUT_POSITION { stream=>stream', ... } )
=
{ update_maildrop (stream, stream');
pur::set_output_position p;
};
fun make_instream (stream: pur::Input_Stream) = make_full_maildrop stream;
fun get_instream (stream: Input_Stream) = thk::get_from_maildrop stream;
fun set_instream (stream: Input_Stream, stream') = update_maildrop (stream, stream');
fun make_outstream (stream: pur::Output_Stream) = make_full_maildrop stream;
fun get_outstream (stream: Output_Stream) = thk::get_from_maildrop stream;
fun set_outstream (stream: Output_Stream, stream') = update_maildrop (stream, stream');
# Open files
fun open_for_read filename
=
make_instream (pur::make_instream (wxd::open_for_read filename, empty_vec))
except
ex = raise exception iox::IO { op=>"open_for_read", name=>filename, cause=>ex };
fun open_for_write filename
=
make_outstream (pur::make_outstream (wxd::open_for_write filename, iox::BLOCK_BUFFERING))
except
ex = raise exception iox::IO { op=>"open", name=>filename, cause=>ex };
fun open_for_append filename
=
make_outstream (pur::make_outstream (wxd::open_for_append filename, iox::NO_BUFFERING))
except
ex = raise exception iox::IO { op=>"open_for_append", name=>filename, cause=>ex };
}; # package winix_data_file_for_os_g
end;
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.