PreviousUpNext

15.4.1114  src/lib/std/src/io/winix-data-file-for-os-g.pkg

## winix-data-file-for-os-g.pkg
#
# Generic package which combines platform-specific code wxd
# with platform-agnostic body code to produce a complete
# platform-specific binary-file package.
#
# This is the binary-file equivalent to
#
#     src/lib/std/src/io/winix-text-file-for-os-g.pkg
#
# It is the multithread alternative to
#
#     src/lib/std/src/io/winix-data-file-for-os-g--premicrothread.pkg
#
# This is the threadkit version of the binary_file generic package
#
#     src/lib/std/src/io/winix-data-file-for-os-g--premicrothread.pkg

# Compiled by:
#     src/lib/std/standard.lib




stipulate
    package dio =  winix_base_data_file_io_driver_for_posix;                            # winix_base_data_file_io_driver_for_posix              is from   src/lib/std/src/io/winix-base-data-file-io-driver-for-posix.pkg
    package thk =  threadkit;                                                           # threadkit                                             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg
    #
    package iox =  io_exceptions;                                                       # io_exceptions                                         is from   src/lib/std/src/io/io-exceptions.pkg
    package eow =  io_startup_and_shutdown;             # "eow" == "end of world"       # io_startup_and_shutdown                               is from   src/lib/std/src/io/io-startup-and-shutdown.pkg
    #
    package wv  = rw_vector_of_one_byte_unts;                                           # rw_vector_of_one_byte_unts                            is from   src/lib/std/src/rw-vector-of-one-byte-unts.pkg
    package wvs = rw_vector_slice_of_one_byte_unts;                                     # rw_vector_slice_of_one_byte_unts                      is from   src/lib/std/src/rw-vector-slice-of-one-byte-unts.pkg
    #
    package rv  = vector_of_one_byte_unts;                                              # vector_of_one_byte_unts                               is from   src/lib/std/src/vector-of-one-byte-unts.pkg
    package rvs = vector_slice_of_one_byte_unts;                                        # vector_slice_of_one_byte_unts                         is from   src/lib/std/src/vector-slice-of-one-byte-unts.pkg
    #
    package pos = file_position;                                                        # file_position                                         is from   src/lib/std/file-position.pkg
herein

    # This generic is invoked (only) in:
    #     
    #     src/lib/std/src/posix/data-file.pkg
    #
    generic package   winix_data_file_for_os_g (
        #             ========================
        #                                                                               # Winix_Extended_File_Io_Driver_For_Os  is from   src/lib/std/src/io/winix-extended-file-io-driver-for-os.api
        package wxd                                                                     # "wxd" == "WiniX file io Driver"
            :
            Winix_Extended_File_Io_Driver_For_Os
                where  drv::Rw_Vector       == dio::Rw_Vector
                where  drv::Vector          == dio::Vector
                where  drv::Rw_Vector_Slice == dio::Rw_Vector_Slice
                where  drv::Vector_Slice    == dio::Vector_Slice
                where  drv::Element         == dio::Element
                where  drv::File_Position   == dio::File_Position
                where  drv::Filereader      == dio::Filereader
                where  drv::Filewriter      == dio::Filewriter;
    )

    : (weak)  Winix_Data_File_For_Os                                                    # Winix_Data_File_For_Os                                is from   src/lib/std/src/io/winix-data-file-for-os.api

    {
        include package   threadkit;                                                    # threadkit                                             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg
        #
        package drv = wxd::drv;                                                         # wxd   is our argument.



        # Assign to a maildrop:
        #
        fun update_maildrop (mv, x)
            =
            {
                take_from_maildrop  mv;
                #
                put_in_maildrop (mv, x);
            };

        some_element = (0u0:  one_byte_unt::Unt);                                       # An element for initializing buffers:

        vec_extract =  rvs::to_vector o rvs::make_slice;                                # "vec" should be renamed to "vector" throughout here. XXX SUCKO FIXME.
        vec_get     =  rv::get;
        rw_vec_set  =  wv::set;
        empty_vec   =  rv::from_list [];

        fun dummy_cleaner () = ();

        package pur {                                                                   # "pur" is short for "pure" (I/O).
            #
            Vector   = rv::Vector;
            Element  = rv::Element;

            Filereader    =  drv::Filereader;
            Filewriter    =  drv::Filewriter;
            File_Position =  drv::File_Position;

            # ** Functional input streams **
            #
            Input_Stream
                =
                INPUT_STREAM  (Input_Buffer, Int)

            also
            Input_Buffer
                =
                INPUT_BUFFER
                  {
                    data:               Vector,
                    file_position:      Null_Or( File_Position ),

                    nextdrop:           Maildrop( Next ),                               # When this cell is empty, it means that 
                                                                                        # there is an outstanding request to the 
                                                                                        # server to extend the stream. 
                    global_file_stuff:  Global_File_Stuff
                  }

            also
            Next
              = NEXT  Input_Buffer                                                      # Forward link to additional data.
              | NO_NEXT                                                                 # Placeholder for forward link.
              | TERMINATED                                                              # Termination of the stream.

            also
            Global_File_Stuff
                =
                GLOBAL_FILE_STUFF
                  {
                    filereader:         Filereader,
                    read_vector:        Int -> Vector,
                    read_vector_mailop: Int -> thk::Mailop( Vector ),

                    is_closed:          Ref( Bool ),
                    get_file_position:  Void -> Null_Or( File_Position ),
                    last_nextref:       Maildrop(  Maildrop( Next ) ),                  # Points to the next cell of the last buffer.

                    clean_tag:          eow::Tag
                  };


            fun global_file_stuff_of_ibuf (INPUT_BUFFER { global_file_stuff, ... } )
                =
                global_file_stuff;


            fun best_io_quantum_of_ibuf buf
                =
                {   (global_file_stuff_of_ibuf  buf)
                        ->
                        GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { best_io_quantum, ... }, ... };

                    best_io_quantum;
                };


            fun read_vector (INPUT_BUFFER { global_file_stuff => GLOBAL_FILE_STUFF { read_vector => f, ... }, ... } )
                =
                f;


            fun raise_io_exception (GLOBAL_FILE_STUFF { filereader => drv::FILEREADER { filename, ... }, ... }, ml_op, cause)
                =
                raise exception  iox::IO  { op => ml_op,  name => filename,  cause };


            Next_Data = EOF
                      | DATA  Input_Buffer
                      ;


            # Extend the stream by a chunk.
            # Invariant: the next m-variable is empty on entry and full on exit.
            #
            fun extend_stream (read_fn, ml_op, buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
                =
                {   global_file_stuff ->   GLOBAL_FILE_STUFF { get_file_position, last_nextref, ... };
                    #
                    file_position = get_file_position ();

                    chunk = read_fn  (best_io_quantum_of_ibuf  buf);

                    if (rv::length chunk == 0)
                        #
                        put_in_maildrop (nextdrop, NO_NEXT);
                        EOF;
                    else 
                        new_next = make_empty_maildrop ();

                        buf' = INPUT_BUFFER {
                                file_position,
                                data => chunk,
                                nextdrop => new_next,
                                global_file_stuff
                              };

                        # Note that we do not fill the new_next cell until
                        # after the last_nextref has been updated.  This ensures
                        # that someone attempting to access the last_nextref will
                        # not acquire the lock until after we are done.
                        #
                        update_maildrop (last_nextref, new_next);
                        put_in_maildrop (nextdrop,  NEXT buf');  #  releases lock!! 
                        put_in_maildrop (new_next, NO_NEXT);
                        DATA buf';
                    fi;
                }
                except
                    ex =    {
                                put_in_maildrop (nextdrop, NO_NEXT);
                                #
                                raise_io_exception (global_file_stuff, ml_op, ex);
                            };


            # Get the next buffer in the stream,
            # extending it if necessary. 
            # If the stream must be extended,
            # we lock it by taking the value from the
            # next cell; the extend_stream function
            # is responsible for filling in the cell.
            #
            fun get_next_buffer (read_fn, ml_op) (buf as INPUT_BUFFER { nextdrop, global_file_stuff, ... } )
                =
                get (thk::get_from_maildrop nextdrop)
                where
                    fun get TERMINATED  =>  EOF;
                        get (NEXT buf') =>  DATA buf';

                        get NO_NEXT
                            =>
                            case (take_from_maildrop  nextdrop)
                                #
                                NO_NEXT => extend_stream (read_fn, ml_op, buf);

                                other    => {
                                                put_in_maildrop (nextdrop, other);
                                                get other;
                                            };
                            esac;
                    end;
                end;

            # Read a chunk that is at least the specified size 
            #
            fun read_chunk buf
                =
                {   (global_file_stuff_of_ibuf buf)
                        ->
                        GLOBAL_FILE_STUFF  { read_vector,  filereader => drv::FILEREADER { best_io_quantum, ... }, ... };

                    case (best_io_quantum - 1)
                        #
                        0 => (\\ n = read_vector n);
                        #
                        k => (\\ n = read_vector (int::quot (n+k, best_io_quantum) * best_io_quantum));
                                    #
                                    # Round up to next multiple of best_io_quantum 
                    esac;
                };

            fun generalized_input get_buf
                =
                get
                where
                    fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
                        =
                        {   len = rv::length data;
                            #
                            if (pos < len)
                                #
                                (vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
                            else
                                case (get_buf buf)
                                    #
                                    EOF       =>  (empty_vec, INPUT_STREAM (buf, len));
                                    DATA rest =>  get (INPUT_STREAM (rest, 0));
                                esac;
                            fi;
                        };
                end;

            # Terminate an input stream 
            #
            fun terminate (global_file_stuff as GLOBAL_FILE_STUFF { last_nextref, clean_tag, ... } )
                =
                {   m = thk::get_from_maildrop last_nextref;
                    #
                    case (take_from_maildrop  m)
                        #
                        (m' as NEXT _)
                            =>
                            {
                                put_in_maildrop (m, m');
                                terminate global_file_stuff;
                            };

                        TERMINATED
                            =>
                            put_in_maildrop (m, TERMINATED);

                       _ => {   eow::drop_stream_startup_and_shutdown_actions clean_tag;
                                put_in_maildrop (m, TERMINATED);
                            };
                    esac;
                };


            # Find the end of the stream 
            #
            fun find_eos (buf as INPUT_BUFFER { nextdrop, data, ... } )
                =
                case (thk::get_from_maildrop nextdrop)
                    #
                    NEXT buf =>  find_eos buf;
                    _        =>  INPUT_STREAM (buf, rv::length data);
                esac;


            fun read (stream as INPUT_STREAM (buf, _))
                =
                generalized_input
                    (get_next_buffer (read_vector buf, "read"))
                    stream;

            fun read_one (INPUT_STREAM (buf, pos))
                =
                {   buf ->  INPUT_BUFFER { data, nextdrop, ... };

                    if (pos < rv::length data)
                        #
                        THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
                    else
                        get (thk::get_from_maildrop nextdrop)
                        where
                            fun get (NEXT buf) => read_one (INPUT_STREAM (buf, 0));
                                get TERMINATED => NULL;

                                get NO_NEXT
                                    =>
                                    case (take_from_maildrop  nextdrop)
                                        #
                                        NO_NEXT
                                            =>
                                            case (extend_stream (read_vector buf, "read_one", buf))
                                                DATA rest => read_one (INPUT_STREAM (rest, 0));
                                                EOF       => NULL;
                                            esac;

                                        other =>
                                            {
                                                put_in_maildrop (nextdrop, other);
                                                get other;
                                            };
                                    esac;
                            end;
                        end;
                    fi;
                };

            fun read_n (INPUT_STREAM (buf, pos), n)
                =
                {   fun join (item, (list, stream))
                        =
                        (item ! list, stream);

                    fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
                        =
                        {   len = rv::length data;
                            #
                            remain = len-i;

                            if (remain >= n)
                                 ([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
                            else
                                 join ( vec_extract (data, i, NULL),
                                        next_buf (buf, n-remain)
                                      );
                            fi;
                          }

                    also
                    fun next_buf (buf as INPUT_BUFFER { nextdrop, data, ... }, n)
                        =
                        get (thk::get_from_maildrop nextdrop)
                        where
                            fun get (NEXT buf)
                                    =>
                                    input_list (buf, 0, n);

                                get TERMINATED
                                    =>
                                    ([], INPUT_STREAM (buf, rv::length data));

                                get NO_NEXT
                                    =>
                                    case (take_from_maildrop nextdrop)
                                        #
                                        NO_NEXT =>
                                            case (extend_stream (read_vector buf, "read_n", buf))
                                                #
                                                EOF         =>  ([], INPUT_STREAM (buf, rv::length data));
                                                (DATA rest) =>  input_list (rest, 0, n);
                                            esac;

                                        other=> {
                                                    put_in_maildrop (nextdrop, other);
                                                    get other;
                                                };
                                    esac;
                            end;
                        end;

                    (input_list (buf, pos, n))
                        ->
                        (data, stream);

                    (rv::cat data, stream);
                };

            fun read_all (stream as INPUT_STREAM (buf, _))
                =
                {   (global_file_stuff_of_ibuf  buf)
                        ->
                        GLOBAL_FILE_STUFF  { filereader => drv::FILEREADER { avail, ... }, ... };
                        

                    # Read a chunk that is as large as the available input.
                    # Note that for systems that use CR-LF for '\n', the
                    # size will be too large, but this should be okay.
                    #
                    fun big_chunk _
                        =
                        read_chunk  buf  delta
                        where
                            delta = case (avail())
                                        #
                                        NULL  =>  best_io_quantum_of_ibuf  buf;
                                        THE n =>  n;
                                    esac;
                        end;

                    big_input
                        =
                        generalized_input
                            (get_next_buffer (big_chunk, "read_all"));

                    fun loop (v, stream)
                        =
                        if (rv::length v == 0)  [];
                        else                   v ! loop (big_input stream);
                        fi;

                    data = rv::cat (loop (big_input stream));

                    (data, find_eos buf);
                };

            fun input1evt        _ =  raise exception DIE "input1Evt unimplemented";
            fun input_mailop     _ =  raise exception DIE "inputEvt unimplemented";
            fun input_nevt       _ =  raise exception DIE "inputNEvt unimplemented";
            fun input_all_mailop _ =  raise exception DIE "inputAllEvt unimplemented";

            # Close an input stream given its global_file_stuff record.
            # We need this function for the cleanup hook
            # to avoid a space leak.
            #
            fun close_in_global_file_stuff (GLOBAL_FILE_STUFF { is_closed=>REF TRUE, ... } )
                    =>
                    ();

                close_in_global_file_stuff (global_file_stuff as GLOBAL_FILE_STUFF { is_closed, filereader => drv::FILEREADER { close, ... }, ... } )
                    =>
                    {
    # ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
                        terminate global_file_stuff;

                        is_closed := TRUE;

                        close()
                        except
                            ex =  raise_io_exception (global_file_stuff, "close_input", ex);
                    };
            end;


            fun close_input (INPUT_STREAM (buf, _))
                =
                close_in_global_file_stuff (global_file_stuff_of_ibuf buf);


            fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { nextdrop, ... }, pos))
                =
                case (take_from_maildrop nextdrop)
                    #
                    other as NEXT _
                        =>
                        {
                            put_in_maildrop (nextdrop, other);
                            #
                            FALSE;
                        };

                    other
                        =>
                        {   buf ->  INPUT_BUFFER { data, global_file_stuff=>GLOBAL_FILE_STUFF { is_closed, ... }, ... };
                            #
                            if (pos == rv::length data)
                                #
                                case (other, *is_closed)
                                    #
                                    (NO_NEXT, FALSE)
                                        =>
                                        case (extend_stream (read_vector buf, "end_of_stream", buf))
                                            #
                                            EOF =>  TRUE;
                                            _   =>  FALSE;
                                        esac;

                                    _   =>
                                        {
                                            put_in_maildrop (nextdrop, other);
                                            TRUE;
                                        };
                                esac;
                            else
                                put_in_maildrop (nextdrop, other);
                                FALSE;
                            fi;
                       };
                esac;


            fun make_instream (filereader, data)
                =
                {   filereader ->  drv::FILEREADER { read_vector, read_vector_mailop, get_file_position, set_file_position, ... };
                    #
                    get_file_position
                        =
                        case (get_file_position, set_file_position)
                            #
                            (THE f, THE _)
                                =>
                                (\\ () =  THE (f ()));

                             _   =>
                                 (\\ () =  NULL);
                         esac;

                    nextdrop =  make_full_maildrop  NO_NEXT;

                    tag      =  eow::note_stream_startup_and_shutdown_actions  dummy_cleaner;

                    global_file_stuff
                        =
                        GLOBAL_FILE_STUFF
                          {
                            filereader,
                            read_vector,
                            read_vector_mailop,
                            get_file_position,
                            #
                            is_closed    =>  REF FALSE,
                            last_nextref =>  make_full_maildrop nextdrop,
                            clean_tag    =>  tag
                          };

                    # * What should we do about the position in this case ?? *

                    # Suggestion: When building a stream with supplied initial data,
                    # nothing can be said about the positions inside that initial
                    # data (who knows where that data even came from!).


                    file_position
                        =
                        if (rv::length data == 0)    get_file_position ();
                        else                         NULL;
                        fi;

                    buf = INPUT_BUFFER {
                            file_position, data,
                            global_file_stuff, nextdrop
                          };

                    stream =  INPUT_STREAM (buf, 0);

                    eow::change_stream_startup_and_shutdown_actions
                      ( tag,
                        \\ () =  close_in_global_file_stuff global_file_stuff
                      );

                    stream;
                };

            fun get_reader (INPUT_STREAM (buf, pos))
                =
                {   buf ->  INPUT_BUFFER { data, global_file_stuff as GLOBAL_FILE_STUFF { filereader, ... }, nextdrop, ... };
                    #
                    fun get_data nextdrop
                        =
                        case (thk::get_from_maildrop nextdrop)
                            #
                            NEXT (INPUT_BUFFER { data, nextdrop=>nextdrop', ... } )
                                =>
                                data ! get_data nextdrop';

                             _ => [];
                        esac;


                    terminate global_file_stuff;

                    if (pos < rv::length data)   (filereader,   rv::cat (vec_extract (data, pos, NULL) ! get_data nextdrop));
                    else                         (filereader,   rv::cat (                                get_data nextdrop));
                    fi;
                };

    /*
          # * Position operations on instreams *
            enum in_pos = INP of {
                base:  pos,
                offset:  Int,
                global_file_stuff:  global_file_stuff
              }
    */

    /*
            fun getPosIn (INPUT_STREAM (buf, pos)) = case buf
                   of INPUT_BUFFER { basePos=NULL, global_file_stuff, ... } =>
                        inputExn (global_file_stuff, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
                    | INPUT_BUFFER { basePos=THE p, global_file_stuff, ... } => INP {
                          base = p, offset = pos, global_file_stuff = global_file_stuff
                        }
                  esac
    */


    /*
            fun filePosIn (INP { base, offset, ... } ) =
                  position.+(base, file_position::from_int offset)
    */

            fun file_position_in (INPUT_STREAM (buf, pos))
                =
                case buf
                    #
                    INPUT_BUFFER { file_position=>NULL, global_file_stuff, ... }
                        =>
                        raise_io_exception (global_file_stuff, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);

                    INPUT_BUFFER { file_position=>THE b, ... }
                        =>
                        file_position::(+) (b, file_position::from_int pos);
                esac;
    /*
            fun setPosIn (pos as INP { global_file_stuff as GLOBAL_FILE_STUFF { reader, ... }, ... } ) = let
                  fpos = filePosIn pos
                  my (drv::FILEREADER rd) = reader
                  in
                    terminate global_file_stuff;
                    the rd.setPos fpos;
                    make_instream (drv::FILEREADER rd, empty_vec)
                  end
    */

            # IO mailop constructors:
            # We exploit the "functional" nature of stream IO
            # to implement the mailop constructors.  These
            # constructors spawn a thread to do the operation
            # and and write the result in an iVariable that
            # serves as the synchronization value.
            #
            # NOTE: This implementation has the weakness that
            # it prevents shutdown when everything else is
            # deadlocked, since the thread that is spawned
            # to actually do the IO could proceed.

            stipulate

                Result X = RES  X | EXCEPTION  Exception;

                fun do_input input_op
                    =
                    {   fun read arg
                            =
                            RES (input_op arg)
                            except
                                ex = EXCEPTION ex;

                        \\ arg
                            =
                            thk::dynamic_mailop
                               {.
                                    reply_1shot =  make_oneshot_maildrop ();

                                    make_thread "binary I/O" {.
                                        put_in_oneshot (reply_1shot, read arg);
                                    };

                                    get_from_oneshot'  reply_1shot
                                        ==>
                                        \\ (RES x)        =>  x;
                                           (EXCEPTION ex) =>  raise exception ex;
                                        end;

                                };

                    };
            herein

                input1evt        =  do_input  read_one;
                input_mailop     =  do_input  read;

                input_nevt       =  do_input  read_n;
                input_all_mailop =  do_input  read_all;
            end;                                                # stipulate


            # ** Output streams **

            # An output stream is implemented
            # as a monitor using an mvar to
            # hold its data.

            Output_Stream_Info
                =
                OUTPUT_STREAM_INFO
                  {
                    buffer:                     wv::Rw_Vector,
                    first_free_byte_in_buffer:  Ref( Int ),

                    is_closed:                  Ref( Bool ),
                    buffering_mode:             Ref( iox::Buffering_Mode ),

                    filewriter:                 Filewriter,

                    write_rw_vector:            wvs::Slice -> Void,
                    write_vector:               rvs::Slice -> Void,

                    clean_tag:  eow::Tag
                  };

            Output_Stream
                =
                Maildrop( Output_Stream_Info );

            fun raise_io_exception (OUTPUT_STREAM_INFO { filewriter => drv::FILEWRITER { filename, ... }, ... }, ml_op, cause)
                =
                raise exception  iox::IO  { op => ml_op,  name => filename,  cause };


            # Lock access to the stream and
            # make sure that it is not closed. 
            #
            fun lock_and_check_closed_out (strm_mv, ml_op)
                =
                case (take_from_maildrop  strm_mv)
                    #
                    (stream as OUTPUT_STREAM_INFO( { is_closed => REF TRUE, ... } ))
                        =>
                        {
                            put_in_maildrop (strm_mv, stream);
                            #
                            raise_io_exception (stream, ml_op, iox::CLOSED_IO_STREAM);
                        };

                    stream => stream;
                esac;


            fun flush_buffer (strm_mv, stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
                =
                case *first_free_byte_in_buffer
                    #
                    0 => ();

                    n => {   write_rw_vector (wvs::make_slice (buffer, 0, THE n));
                             #
                             first_free_byte_in_buffer := 0;
                         }
                         except
                             ex =  {
                                        put_in_maildrop (strm_mv, stream);
                                        #
                                        raise_io_exception (stream, ml_op, ex);
                                   };
                esac;


            fun write (strm_mv, v)
                =
                {   (lock_and_check_closed_out (strm_mv, "write"))
                        ->
                        (stream as OUTPUT_STREAM_INFO os);

                    fun release ()
                        =
                        put_in_maildrop (strm_mv, stream);

                    os ->  { buffer, first_free_byte_in_buffer, buffering_mode, ... };


                    fun flush ()
                        =
                        flush_buffer (strm_mv, stream, "write");

                    fun flush_all ()
                        =
                        os.write_rw_vector  (wvs::make_full_slice  buffer)
                        except
                            ex =  {   release();
                                      #
                                      raise_io_exception (stream, "write", ex);
                                  };


                    fun write_direct ()
                        =
                        {   case *first_free_byte_in_buffer
                                #
                                0 => ();
                                #
                                n => {   os.write_rw_vector  (wvs::make_slice  (buffer, 0, THE n));
                                         #
                                         first_free_byte_in_buffer := 0;
                                     };
                            esac;

                            os.write_vector  (rvs::make_full_slice  v);
                        }
                        except
                            ex =  {   release();
                                      #
                                      raise_io_exception  (stream, "write", ex);
                                  };

                    fun insert copy_vec
                        =
                        {   buf_len  =  wv::length  buffer;
                            data_len =  rv::length  v;

                            if (data_len >= buf_len)
                                #
                                write_direct ();
                            else
                                i = *first_free_byte_in_buffer;

                                avail = buf_len - i;

                                if (avail < data_len)
                                    #
                                    copy_vec (v, 0, avail, buffer, i);

                                    flush_all ();

                                    copy_vec (v, avail, data_len-avail, buffer, 0);

                                    first_free_byte_in_buffer := data_len-avail;
                                else
                                    copy_vec (v, 0, data_len, buffer, i);

                                    first_free_byte_in_buffer :=  i + data_len;

                                    if (avail == data_len)   flush ();   fi;
                                fi;
                            fi;
                       };

                    case *buffering_mode
                        #
                        iox::NO_BUFFERING
                            =>
                            write_direct ();

                        _   =>
                            insert copy_vec
                            where
                                fun copy_vec (from, from_i, from_len, into, at)
                                    =
                                    wvs::copy_vector  { from =>  rvs::make_slice (from, from_i, THE from_len),
                                                        into,
                                                        at
                                                      };

                            end;
                    esac;

                    release ();
                };

            fun write_one (strm_mv, element)
                =
                release ()
                where
                    (lock_and_check_closed_out (strm_mv, "write_one"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } );

                    fun release ()
                        =
                        put_in_maildrop (strm_mv, stream);

                    case *buffering_mode
                        #
                        iox::NO_BUFFERING
                            =>
                            {   rw_vec_set (buffer, 0, element);
                                #
                                write_rw_vector (wvs::make_slice (buffer, 0, THE 1))
                                except
                                    ex =  {   release();
                                              raise_io_exception (stream, "write_one", ex);
                                          };
                            };

                         _   =>
                            {   i = *first_free_byte_in_buffer;
                                #
                                i' = i+1;

                                rw_vec_set (buffer, i, element);

                                first_free_byte_in_buffer := i';

                                if (i' == wv::length  buffer)
                                    #
                                    flush_buffer (strm_mv, stream, "write_one");
                                fi;
                            };
                    esac;
                end;

            fun flush strm_mv
                 =
                 {   stream =  lock_and_check_closed_out (strm_mv, "flush");
                     #  
                     flush_buffer (strm_mv, stream, "flush");
                     #  
                     put_in_maildrop (strm_mv, stream);
                 };

            fun close_output  strm_mv
                =
                {
                    (take_from_maildrop  strm_mv)
                        ->
                        (stream as OUTPUT_STREAM_INFO  { filewriter => drv::FILEWRITER { close, ... },  is_closed,  clean_tag, ... } );

                    if (not *is_closed)
                        #
                        flush_buffer (strm_mv, stream, "close");
                        is_closed := TRUE;
                        eow::drop_stream_startup_and_shutdown_actions clean_tag;
                        close();
                    fi;

                    put_in_maildrop (strm_mv, stream);
                };

            fun make_outstream  (wr as drv::FILEWRITER { best_io_quantum, write_rw_vector, write_vector, ... },  mode)
                =
                stream
                where
                    fun iterate (f, size, subslice)
                        =
                        lp
                        where
                            fun lp sl
                                =
                                if (size sl != 0) 
                                    #
                                    n = f sl;

                                    lp (subslice (sl, n, NULL));
                                fi;
                        end;

                    write_rw_vector' = iterate (write_rw_vector, wvs::length, wvs::make_subslice);
                    write_vector'    = iterate (write_vector,    rvs::length, rvs::make_subslice);

                    # Install a dummy cleaner:
                    #
                    tag = eow::note_stream_startup_and_shutdown_actions dummy_cleaner;

                    stream =    make_full_maildrop (
                                    #
                                    OUTPUT_STREAM_INFO
                                      {
                                        buffer          =>  wv::make_rw_vector (best_io_quantum, some_element),
                                        #
                                        first_free_byte_in_buffer       =>  REF 0,

                                        is_closed       =>  REF FALSE,
                                        buffering_mode  =>  REF mode,

                                        filewriter      =>  wr,
                                        write_rw_vector =>  write_rw_vector',
                                        write_vector    =>  write_vector',
                                        clean_tag       =>  tag
                                      }
                              );

                    eow::change_stream_startup_and_shutdown_actions  (tag,  \\ () = close_output stream);
                end;


            fun get_writer strm_mv
                =
                {   (lock_and_check_closed_out (strm_mv, "get_writer"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { filewriter, buffering_mode, ... } );

                    (filewriter, *buffering_mode)
                    then
                        put_in_maildrop (strm_mv, stream);
                };

            # Position operations on outstreams:
            #
            Out_Position
                =
                OUT_POSITION
                  {
                    pos:        drv::File_Position,
                    stream:     Output_Stream
                  };

            fun get_output_position strm_mv
                =
                {   (lock_and_check_closed_out (strm_mv, "getWriter"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { filewriter, ... } );

                    fun release ()
                        =
                        put_in_maildrop (strm_mv, stream);

                    flush_buffer (strm_mv, stream, "get_output_position");

                    case filewriter
                        #
                        drv::FILEWRITER { get_file_position => THE f, ... }
                            =>
                            OUT_POSITION { pos => f(), stream => strm_mv }
                            except
                                ex =    {   release();
                                            raise_io_exception (stream, "get_output_position", ex);
                                        };
                        _   =>  {   release();
                                    raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                                }
                                then
                                    release ();
                    esac;
                };

            fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
                =
                {
                    put_in_maildrop (strm_mv, lock_and_check_closed_out (strm_mv, "file_pos_out"));
                    #
                    pos;
                };

            fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
                =
                {   (lock_and_check_closed_out (strm_mv, "set_output_position"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { filewriter, ... } );

                    fun release ()
                        =
                        put_in_maildrop (strm_mv, stream);

                    case filewriter
                        #
                        drv::FILEWRITER { set_file_position=>THE f, ... }
                            => 
                            f pos
                            except
                                ex =    {   release ();
                                            #
                                            raise_io_exception (stream, "set_output_position", ex);
                                        };
                        _   =>
                            {   release ();
                                #
                                raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                            };
                    esac;

                    release ();
                };

            fun set_buffering_mode (strm_mv, mode)
                =
                {   (lock_and_check_closed_out (strm_mv, "setBufferMode"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );

                    if (mode == iox::NO_BUFFERING)
                        #                   
                        flush_buffer (strm_mv, stream, "setBufferMode");
                    fi;

                    buffering_mode := mode;

                    put_in_maildrop (strm_mv, stream);
                };

            fun get_buffering_mode  strm_mv
                =
                {   # Should we be checking for closed streams here???  XXX BUGGO FIXME
                    #
                    (lock_and_check_closed_out (strm_mv, "getBufferMode"))
                        ->
                        (stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );

                    *buffering_mode
                    then
                        put_in_maildrop (strm_mv, stream);
                };

        };              # package pure_io 

        Vector  =  rv::Vector;
        Element =  rv::Element;

        Input_Stream  =  Maildrop( pur::Input_Stream  );
        Output_Stream =  Maildrop( pur::Output_Stream );



        # Input operations:

        fun read stream
            =
            {
                (pur::read (take_from_maildrop stream))
                    ->
                    (v, stream');

                put_in_maildrop (stream, stream');

                v;
            };

        fun read_one stream
            =
            case (pur::read_one (take_from_maildrop stream))
                #
                THE (element, stream')
                    =>
                    {
                        put_in_maildrop (stream, stream');
                        #
                        THE element;
                    };

                NULL => NULL;
            esac;


        fun read_n (stream, n)
            =
            {
                (pur::read_n (take_from_maildrop stream, n))
                    ->
                    (v, stream');

                put_in_maildrop (stream, stream');

                v;
            };

        fun read_all (stream:  Input_Stream)
            =
            {
                (pur::read_all (take_from_maildrop stream))
                    ->
                    (v, stream');

                put_in_maildrop (stream, stream'); v;
            };

        fun input1evt        _ =  raise exception  DIE "input1evt unimplemented";
        fun input_mailop     _ =  raise exception  DIE "input_mailop unimplemented";
        fun input_nevt       _ =  raise exception  DIE "input_nevt unimplemented";
        fun input_all_mailop _ =  raise exception  DIE "input_ell_mailop unimplemented";

        fun peek (stream:  Input_Stream)
            =
            case (pur::read_one (thk::get_from_maildrop stream))                                        # We do a pure read_one and then discard the new stream. Since pure reads have no side effects, this is just the ticket.
                #
                THE (element, _) => THE element;
                #
                NULL => NULL;
            esac;

        fun close_input stream
            =
            {
                (take_from_maildrop  stream)
                    ->
                    (s as pur::INPUT_STREAM (buf as pur::INPUT_BUFFER { data, ... }, _));

                pur::close_input  s;

                put_in_maildrop (stream, pur::find_eos buf);
            };

        fun end_of_stream stream
            =
            pur::end_of_stream (thk::get_from_maildrop stream);
    /*
        fun getPosIn stream = pur::getPosIn (mGet stream)
        fun setPosIn (stream, p) = mUpdate (stream, pur::setPosIn p)
    */


        # Output operations:

        fun write (stream, v)     =  pur::write     (thk::get_from_maildrop stream, v);
        fun write_one (stream, c) =  pur::write_one (thk::get_from_maildrop stream, c);

        fun flush        stream =  pur::flush        (thk::get_from_maildrop stream);
        fun close_output stream =  pur::close_output (thk::get_from_maildrop stream);

        fun get_output_position stream = pur::get_output_position (thk::get_from_maildrop stream);

        fun set_output_position (stream, p as pur::OUT_POSITION { stream=>stream', ... } )
            =
            {   update_maildrop (stream, stream');
                pur::set_output_position p;
            };

        fun make_instream (stream:  pur::Input_Stream)      =  make_full_maildrop stream;
        fun get_instream  (stream:  Input_Stream)           =  thk::get_from_maildrop stream;
        fun set_instream  (stream:  Input_Stream, stream')  =  update_maildrop (stream, stream');

        fun make_outstream (stream:  pur::Output_Stream)    =  make_full_maildrop stream;
        fun get_outstream (stream:  Output_Stream)          =  thk::get_from_maildrop stream;
        fun set_outstream (stream:  Output_Stream, stream') =  update_maildrop (stream, stream');



        # Open files

        fun open_for_read  filename
            =
            make_instream (pur::make_instream (wxd::open_for_read filename, empty_vec))
            except
                ex =  raise exception iox::IO { op=>"open_for_read", name=>filename, cause=>ex };

        fun open_for_write  filename
            =
            make_outstream (pur::make_outstream (wxd::open_for_write filename, iox::BLOCK_BUFFERING))
            except
                ex =  raise exception iox::IO { op=>"open", name=>filename, cause=>ex };

        fun open_for_append  filename
            =
            make_outstream (pur::make_outstream (wxd::open_for_append filename, iox::NO_BUFFERING))
            except
                ex =  raise exception iox::IO { op=>"open_for_append", name=>filename, cause=>ex };

    };                                                                                  # package winix_data_file_for_os_g 
end;

## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext