PreviousUpNext

15.4.1672  src/lib/x-kit/xclient/src/wire/outbuf-ximp.pkg

## outbuf-ximp.pkg
#
# For the big picture see the imp dataflow diagrams in
#
#     src/lib/x-kit/xclient/src/window/xclient-ximps.pkg

# Compiled by:
#     src/lib/x-kit/xclient/xclient-internals.sublib




                                                                # xevent_types                          is from   src/lib/x-kit/xclient/src/wire/xevent-types.pkg
                                                                # xerrors                               is from   src/lib/x-kit/xclient/src/wire/xerrors.pkg

stipulate
    include package   threadkit;                                # threadkit                             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg
    #
    #
    package op  =  xsequencer_to_outbuf;                        # xsequencer_to_outbuf                  is from   src/lib/x-kit/xclient/src/wire/xsequencer-to-outbuf.pkg
    package skj =  socket_junk;                                 # socket_junk                           is from   src/lib/internet/socket-junk.pkg
    package sok =  socket__premicrothread;                      # socket__premicrothread                is from   src/lib/std/socket--premicrothread.pkg
    package v1u =  vector_of_one_byte_unts;                     # vector_of_one_byte_unts               is from   src/lib/std/src/vector-of-one-byte-unts.pkg
    package iox =  io_exceptions;                               # io_exceptions                         is from   src/lib/std/src/io/io-exceptions.pkg
    #
    package xtr =  xlogger;                                     # xlogger                               is from   src/lib/x-kit/xclient/src/stuff/xlogger.pkg
    #
    trace =  xtr::log_if  xtr::io_logging  0;                   # Conditionally write strings to tracing.log or whatever.


    #
    fun exception_message (iox::IO { cause, op, name } )
            =>
            {   cause_message
                    =
                    case cause
                        #
                        iox::BLOCKING_IO_NOT_SUPPORTED      => [", blocking I/O not supported"];
                        iox::RANDOM_ACCESS_IO_NOT_SUPPORTED => [", random access not supported"];
                        iox::TERMINATED_INPUT_STREAM        => [", terminated input stream"];
                        iox::CLOSED_IO_STREAM               => [", closed stream"];
                        _                                       => [" with exception ", exception_message cause];
                   esac;

                cat ("Io: " ! op ! " failed on \"" ! name ! "\"" ! cause_message);
            };

        exception_message (DIE s)            => "DIE: " + s;
        exception_message BIND                => "nonexhaustive naming failure";     # NOTE: we should probably include line/file info for MATCH and BIND  XXX BUGGO FIXME
        exception_message MATCH               => "nonexhaustive match failure";
        exception_message INDEX_OUT_OF_BOUNDS => "index out of bounds";
        exception_message SIZE                => "size";
        exception_message OVERFLOW            => "overflow";
        exception_message DIVIDE_BY_ZERO      => "divide by zero";
        exception_message DOMAIN              => "domain error";
        exception_message e                   => "unknown";
    end;

    # Convert "abc" -> "61.62.63." etc:
    #
    fun string_to_hex s
        =
        string::translate
            (\\ c =  number_string::pad_left '0' 2 (int::format number_string::HEX (char::to_int c)) + ".")
             s;

    # As above, starting with byte-vector:
    #
    fun bytes_to_hex  bytes
        =
        string_to_hex (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));

    # Show printing chars verbatim, everything
    # else as '.', per hexdump tradition:
    #
    fun string_to_ascii s
        =
        string::translate
            (\\ c =  char::is_print c  ??  string::from_char c  ::  ".")
            s;

    # As above, starting with byte-vector:
    #
    fun bytes_to_ascii  bytes
        =
        string_to_ascii (byte::unpack_string_vector (vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));


    max_chars_to_trace_per_send = THE 10000;

    fun out_vector_to_string  v
        =
        {
foo = vector_slice_of_one_byte_unts::make_full_slice v
      except
          x = {
log::note_on_stderr {. "out_vector_to_string/AAA000 -- outbuf-ximp.pkg\n"; };
exception_msg = exception_message x;
log::note_on_stderr {. "out_vector_to_string/AAA111 " + exception_msg + " -- outbuf-ximp.pkg\n"; };
                raise exception x;
              };

            prefix_to_show
                =
                byte::unpack_string_vector
                    (
                        foo
                    );


            case max_chars_to_trace_per_send
                #
                THE n =>    {
                                as_hex   = string_to_hex    prefix_to_show;
                                as_ascii = string_to_ascii  prefix_to_show;
                                len      =  (v1u::length v);
                                len      = int::to_string len;
                                cat [ "Sent to X server: ",   as_hex,
                                      "... == \"",            as_ascii,
                                      "\"... (", len, " bytes -- outbuf-ximp.pkg)\n"
                                    ];
                            };
                NULL =>     {
                                cat [ "Sent to X server: ",    string_to_hex    prefix_to_show,
                                      " == \"",               string_to_ascii  prefix_to_show,
                                      "\"  (", int::to_string (v1u::length v), " bytes -- outbuf-ximp.pkg)\n"
                                    ];
                            };
            esac;
        };      

herein

    # This imp is typically instantiated by:
    #
    #     src/lib/x-kit/xclient/src/wire/xsocket-ximps.pkg

    package   outbuf_ximp
    : (weak)  Outbuf_Ximp                                       # Outbuf_Ximp                           is from   src/lib/x-kit/xclient/src/wire/outbuf-ximp.api
    {
        Outbuf_State                                            # Holds all nonephemeral mutable state maintained by ximp.
            =
            Ref( Void );

        Imports = {  };                                         # PUBLIC. Ports we use, provided by other imps.

        Me_Slot(X)
            =
            Mailslot(  {   imports:     Imports,
                            me:         Outbuf_State,
                            run_gun':   Run_Gun,
                            end_gun':   End_Gun,
                            socket:     sok::Socket (X, sok::Stream(sok::Active))                                       # Socket to read.
                        }
                    );
                                                                                                                        # NB: We've eliminated the Foo_Plea type from other imps in favor of just passing
                                                                                                                        #     Runstate -> Void thunks through the mailqueue, but we've retained them here
                                                                                                                        #     because we want to batch-process our message-queue contents, which requires
                                                                                                                        #     that we have more visibility into the queue contents than allowed by thunks.
        Outbuf_Plea = SEND_BYTEVECTOR        v1u::Vector                                                                #
                    | SEND_BYTEVECTORS List( v1u::Vector )
                    | FLUSH (Void -> Void)
                    ;
        Outbuf_Q    = Mailqueue( Outbuf_Plea );


        Exports   = {                                                                                                   # PUBLIC.
                      xsequencer_to_outbuf:     op::Xsequencer_To_Outbuf
                    };

        Option    =  MICROTHREAD_NAME String;                                                                           # PUBLIC.

        Outbuf_Egg =  Void -> (Exports,   (Imports, Run_Gun, End_Gun) -> Void);                                         # PUBLIC.



        fun run {                                                                                                       # These values will be statically globally visible throughout the code body for the imp.
                  me:                   Outbuf_State,                                                                   # 
                  imports:              Imports,                                                                        # Ximps to which we send requests.
                  to:                   Replyqueue,                                                                     # The name makes   foo::pass_something(imp) to {. ... }   syntax read well.
                  end_gun':             End_Gun,                                                                        # We shut down the microthread when this fires.
                  outbuf_q:             Outbuf_Q,                                                                       # 
                  socket:               sok::Socket (X, sok::Stream(sok::Active))                                       # Socket to read.
                }
            =
            {
                loop ();
            }
            where
                fun loop ()                                                                                             # Outer loop for the imp.
                    =
                    {
                        do_one_mailop' to [
                            #
                            (end_gun'                             ==>  shut_down_outbuf_imp'),
                            (take_all_from_mailqueue' outbuf_q    ==>  do_outbuf_pleas)
                        ];

                        loop ();
                    }   
                    where
                        fun shut_down_outbuf_imp' ()
                            =
                            {
#                               sok::close socket;
                                    #
                                    # Reppy closed the socket here.
                                    #
                                    # I do NOT close the socket here because I want
                                    # to be able to kill off and recreate impnets
                                    # freely without affecting the xserver session.
                                    #
                                    # I think of outbuf as USING the socket but not
                                    # OWNING it.  We get handed the socket already
                                    # opened by some external agent, and it is up to
                                    # that external agent to close the socket when/if
                                    # it wants the socket closed.

                                thread_exit { success => TRUE };                                                        # Will not return.      
                            };

                        fun do_outbuf_pleas  []
                                =>
                                ();

                            do_outbuf_pleas  [ SEND_BYTEVECTOR bytevector ]
                                =>
                                skj::send_vector  (socket, bytevector);

                            do_outbuf_pleas  [ SEND_BYTEVECTORS bytevectors ]
                                =>
                                skj::send_vector (socket, (v1u::cat bytevectors));

                            do_outbuf_pleas  (FLUSH signal_fn ! rest)
                                =>
                                {
                                    signal_fn ();                                                                       # An upstream caller can use this to verify that all output to given point has been flushed beyond outbuf.
                                    do_outbuf_pleas  rest;
                                };

                            do_outbuf_pleas  pleas
                                =>
                                {
                                    (scan_pleas (pleas, [])) ->  (rest, vectors);
                                    skj::send_vector  (socket,  (v1u::cat  vectors));                                   # Not actually a blocking call, provided that I/O redirection is in effect. (Which it will be, except briefly early in bootstrap.)
                                    do_outbuf_pleas rest;
                                }
                                where
                                    # Find longest prefix consisting of SEND_BYTEVECTOR[S]
                                    # and return all the vectors from it, plus the
                                    # 'rest' of the pleas-list:
                                    #   
                                    fun scan_pleas ([], vectors)
                                            =>
                                            ([], reverse vectors);

                                        scan_pleas (SEND_BYTEVECTOR vector ! rest, vectors)
                                            =>
                                            scan_pleas (rest, vector ! vectors);

                                        scan_pleas (SEND_BYTEVECTORS vectors' ! rest, vectors)
                                            =>
                                            scan_pleas (rest, (reverse vectors') @ vectors);

                                        scan_pleas (rest as (FLUSH _ ! _), vectors)
                                            =>
                                            (rest, reverse vectors);
                                    end;
                                end;
                        end;
                    end;                                                                                                # fun loop
            end;                                                                                                        # fun run
        
        fun startup   (reply_oneshot:  Oneshot_Maildrop( (Me_Slot(X), Exports) ))   ()                                  # Root fn of imp microthread.  Note currying.
            =
            {   me_slot  =  make_mailslot  ()   :  Me_Slot(X);
                #
                xsequencer_to_outbuf =  { put_value, put_values, flush_outbuf };

                to          =  make_replyqueue();

                put_in_oneshot (reply_oneshot, (me_slot, { xsequencer_to_outbuf }));                                                    # Return value from outbuf_egg'().

                (take_from_mailslot  me_slot)                                                                           # Imports from outbuf_egg'().
                    ->
                    { me, imports, run_gun', end_gun', socket };

                block_until_mailop_fires  run_gun';                                                                     # Wait for the starting gun.

                run { me, outbuf_q, imports, to, end_gun', socket };                                                    # Will not return.
            }
            where
                outbuf_q =  make_mailqueue (get_current_microthread())  :  Outbuf_Q;

                fun put_value (vector: v1u::Vector)                                                                     # PUBLIC.
                    =   
                    put_in_mailqueue  (outbuf_q, SEND_BYTEVECTOR vector);

                fun put_values (vectors: List(v1u::Vector))                                                             # PUBLIC.
                    =   
                    put_in_mailqueue  (outbuf_q, SEND_BYTEVECTORS vectors);

                fun flush_outbuf (signal_fn: Void -> Void)                                                              # PUBLIC.
                    =   
                    put_in_mailqueue  (outbuf_q, FLUSH signal_fn);
            end;

        fun process_options (options: List(Option), { name })
            =
            {   my_name   = REF name;
                #
                apply  do_option  options
                where
                    fun do_option (MICROTHREAD_NAME n)  =   my_name := n;
                end;

                { name => *my_name };
            };

        ##########################################################################################
        # PUBLIC.
        #
        fun make_outbuf_egg                                                                                             # PUBLIC. PHASE 1: Construct our state and initialize from 'options'.
              ( socket:    sok::Socket (X, sok::Stream(sok::Active)),
                options:   List(Option)
              )
            =
            {   (process_options (options, { name => "outbuf" }))
                    ->
                    { name };

                me = REF ();

                \\ () = {   reply_oneshot = make_oneshot_maildrop() :  Oneshot_Maildrop( (Me_Slot(X), Exports) );       # PUBLIC. PHASE 2: Start our microthread and return our Exports to caller.
                            #
                            xlogger::make_thread  name  (startup  reply_oneshot);                                       # Note that startup() is curried.

                            (get_from_oneshot  reply_oneshot) -> (me_slot, exports);

                            fun phase3                                                                                  # PUBLIC. PHASE 3: Accept our Imports, then wait for Run_Gun to fire.
                                ( imports:      Imports,
                                  run_gun':     Run_Gun,        
                                  end_gun':     End_Gun
                                )
                                =
                                {
                                    put_in_mailslot  (me_slot, { me, imports, run_gun', end_gun', socket });
                                };

                            (exports, phase3);
                        };
            };
    };                                                                                                                  # package outbuf_ximp
end;

#       fun out_msg_to_string FLUSH_OUTBUF
#               =>
#               "OutFlush";
#
#           out_msg_to_string SHUT_DOWN_OUTBUF
#               =>
#               "OutQuit";
#
#           out_msg_to_string (ADD_TO_OUTBUF v)
#               =>
#               {   prefix_to_show
#                       =
#                       byte::unpack_string_vector
#                           (vector_slice_of_one_byte_unts::make_slice
#                               (v, 0, max_chars_to_trace_per_send)
#                           );
#
#                   case max_chars_to_trace_per_send
#                       #
#                       THE n =>    cat [ "Sent to X server: ",   string_to_hex    prefix_to_show,
#                                         "... == \"",            string_to_ascii  prefix_to_show,
#                                         "\"... (", int::to_string (v1u::length v), " bytes)"
#                                       ];
#
#                       NULL =>    cat [ "Sent to X server: ",    string_to_hex    prefix_to_show,
#                                         " == \"",               string_to_ascii  prefix_to_show,
#                                         "\"  (", int::to_string (v1u::length v), " bytes)"
#                                       ];
#                   esac;
#               };      
#       end;




Comments and suggestions to: bugs@mythryl.org

PreviousUpNext