PreviousUpNext

15.4.1670  src/lib/x-kit/xclient/src/wire/inbuf-ximp.pkg

## inbuf-ximp.pkg
#
##########################################################################################
# X socket input buffer imp.
#
# For the big picture see the imp dataflow diagrams in
#
#     src/lib/x-kit/xclient/src/window/xclient-ximps.pkg
#
# Here we monitor the input stream from the X-server
# socket and break it up into individual messages which
# are sent on out_mailslot to be unmarshalled and routed
# by the sequencer.
#
# We get three kinds of messages from the X server:
#       
#  o Replies to requests we have sent. Always at least 32 bytes long.
#  o Error messages.                   Always  exactly 32 bytes long.
#  o Events.                           Always  exactly 32 bytes long.
#       
# The first byte of the message distinguishes the three types.
#       
# For more details see (e.g.) p1 "1 Protocol Formats" in:
#
#     http://mythryl.org/pub/exene/X-protocol-R6.pdf
#       
# Our task here is to repetitively read one complete message
# from the X server socket (which on a reply means reading any
# extra databytes) and then forward to the sequencer a pair
#
#     (message-bytecode, message-bytes)

# where 'message-bytecode' is the first byte from the message
# and message-bytes is the complete message, including bytecode.

# Compiled by:
#     src/lib/x-kit/xclient/xclient-internals.sublib




                                                                # xevent_types                          is from   src/lib/x-kit/xclient/src/wire/xevent-types.pkg
                                                                # xerrors                               is from   src/lib/x-kit/xclient/src/wire/xerrors.pkg

stipulate
    include package   threadkit;                                # threadkit                             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg
    #
    package mps =  microthread_preemptive_scheduler;            # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    package un  =  unt;                                         # unt                                   is from   src/lib/std/unt.pkg
    package uid =  issue_unique_id;                             # issue_unique_id                       is from   src/lib/src/issue-unique-id.pkg
#   package wv8 =  rw_vector_of_one_byte_unts;                  # rw_vector_of_one_byte_unts            is from   src/lib/std/src/rw-vector-of-one-byte-unts.pkg
    package psx =  posixlib;                                    # posixlib                              is from   src/lib/std/src/psx/posixlib.pkg
    package e2s =  xerror_to_string;                            # xerror_to_string                      is from   src/lib/x-kit/xclient/src/to-string/xerror-to-string.pkg
    package skj =  socket_junk;                                 # socket_junk                           is from   src/lib/internet/socket-junk.pkg
    package sok =  socket__premicrothread;                      # socket__premicrothread                is from   src/lib/std/socket--premicrothread.pkg
    package soc =  socket;                                      # socket                                is from   src/lib/std/src/socket/socket.pkg
    package v1u =  vector_of_one_byte_unts;                     # vector_of_one_byte_unts               is from   src/lib/std/src/vector-of-one-byte-unts.pkg
    package v2w =  value_to_wire;                               # value_to_wire                         is from   src/lib/x-kit/xclient/src/wire/value-to-wire.pkg
    package w2v =  wire_to_value;                               # wire_to_value                         is from   src/lib/x-kit/xclient/src/wire/wire-to-value.pkg
    package xps =  xpacket_sink;                                # xpacket_sink                          is from   src/lib/x-kit/xclient/src/wire/xpacket-sink.pkg
    #
    package g2d =  geometry2d;                                  # geometry2d                            is from   src/lib/std/2d/geometry2d.pkg
    package xtr =  xlogger;                                     # xlogger                               is from   src/lib/x-kit/xclient/src/stuff/xlogger.pkg
    #
    trace =  xtr::log_if  xtr::io_logging  0;                   # Conditionally write strings to tracing.log or whatever.

    std_packet_size = 32;                                       # Standard size-in-bytes for an X protocol message.




    # Convert "abc" -> "61.62.63." etc:
    #
    fun string_to_hex s
        =
        string::translate
            (\\ c =  number_string::pad_left '0' 2 (int::format number_string::HEX (char::to_int c)) + ".")
             s;

    # As above, starting with byte-vector:
    #
    fun bytes_to_hex  bytes
        =
        string_to_hex (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));

    # Show printing chars verbatim, everything
    # else as '.', per hexdump tradition:
    #
    fun string_to_ascii s
        =
        string::translate
            (\\ c =  char::is_print c  ??  string::from_char c  ::  ".")
            s;

    # As above, starting with byte-vector:
    #
    fun bytes_to_ascii  bytes
        =
        string_to_ascii (byte::unpack_string_vector (vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));


    max_chars_to_trace_per_read = THE 10000;

    fun in_vector_to_string  v
        =
        {
        foo = vector_slice_of_one_byte_unts::make_full_slice v
      except
          x = {
log::note_on_stderr {. "in_vector_to_string/AAA000 -- inbuf-ximp.pkg\n"; };
exception_msg = exception_message x;
log::note_on_stderr {. "in_vector_to_string/AAA111 " + exception_msg + " -- inbuf-ximp.pkg\n"; };
                raise exception x;
              };

            prefix_to_show
                =
                byte::unpack_string_vector
                    (
                        foo
                    );

            case max_chars_to_trace_per_read
                #
                THE n =>    {
                                as_hex   = string_to_hex    prefix_to_show;
                                as_ascii = string_to_ascii  prefix_to_show;
                                len      =  (v1u::length v);
                                len      = int::to_string len;
                                cat [ "Read from X server: ",   as_hex,
                                      "... == \"",            as_ascii,
                                      "\"... (", len, " bytes -- inbuf-ximp.pkg)\n"
                                    ];
                            };
                NULL =>     {
                                cat [ "Read from X server: ",    string_to_hex    prefix_to_show,
                                      " == \"",               string_to_ascii  prefix_to_show,
                                      "\"  (", int::to_string (v1u::length v), " bytes -- inbuf-ximp.pkg)\n"
                                    ];
                            };
            esac;
        };      

herein

    # This imp is typically instantiated by:
    #
    #     src/lib/x-kit/xclient/src/wire/xsocket-ximps.pkg

    package   inbuf_ximp
    : (weak)  Inbuf_Ximp                                        # Inbuf_Ximp                            is from   src/lib/x-kit/xclient/src/wire/inbuf-ximp.api
    {
        Run_Gun = Mailop(Void);                                 # Purely for readability.
        End_Gun = Mailop(Void);                                 # Purely for readability.

        Inbuf_Ximp_State                                        # Holds all nonephemeral mutable state maintained by ximp.
            =
            { bytes_left_to_read:       Ref(Int),
              done_header:              Ref(Bool),
              saved_bytevectors:        Ref(List(v1u::Vector))
            };

        Xpacket = { code: v1u::Element,  packet: v1u::Vector }; # message-bytecode, message-bytes.
                                                                # code is first byte from message.
                                                                # 'packet'  is complete message, including code.

        Imports = { xpacket_sink:   xps::Xpacket_Sink };                                                                # Ports we use, provided by other imps.



        Me_Slot(X)
            =
            Mailslot(   {   imports:    Imports,
                            me:         Inbuf_Ximp_State,
                            run_gun':   Run_Gun,
                            end_gun':   End_Gun,
                            socket:     sok::Socket (X, sok::Stream(sok::Active))                                       # Socket to read.
                        }
                    );

        Exports = { };                                                                                                  # Ports we provide for use by other imps.


        Option = MICROTHREAD_NAME String;                                                       # 

        Inbuf_Egg =  Void -> (Exports,   (Imports, Run_Gun, End_Gun) -> Void);

        fun run {                                                                                                       # These values will be statically globally visible throughout the code body for the imp.
                  me:                   Inbuf_Ximp_State,                                                               # 
                  imports:              Imports,                                                                        # Ximps to which we send requests.
                  to:                   Replyqueue,                                                                     # The name makes   foo::pass_something(imp) to {. ... }   syntax read well.
                  end_gun':             End_Gun,                                                                        # We shut down the microthread when this fires.
                  socket:               sok::Socket (X, sok::Stream(sok::Active))                                       # Socket to read.
                }
            =
            {
                loop ();
            }
            where
                fun shut_down_inbuf_imp' ()
                    =
                    thread_exit { success => TRUE };                                                                    # Will not return.


                fun reset_packetreader_state ()
                    =
                    {   me.bytes_left_to_read :=  std_packet_size;
                        me.done_header        :=  FALSE;
                        me.saved_bytevectors  :=  [];
                    };

                fun handle_packet_header  packet
                    =
                    {   code   =  v1u::get (packet, 0);
                        #
                        case code
                            #
                            0u1 =>  # Reply -- may need to read additional data bytes.
                                    # 
                                    # Byte    0 contains the 'Reply' bytecode (0u1).
                                    # 
                                    # Bytes 1-4 contain the number of extra 32-bit words
                                    #           of data following the stock 32-byte header.
                                    {
                                        extra_dwords =  large_unt::to_int_x (pack_big_endian_unt1::get_vec (packet, 1));

                                        if (extra_dwords == 0)                                                                  # Need to read rest of packet.
                                            #
                                            imports.xpacket_sink.put_value { code, packet };                                    # Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).

                                            reset_packetreader_state ();                                                        # Set up to read next packet.
                                        else
                                            me.bytes_left_to_read :=  4 * extra_dwords;                                         # "* 4" because we measure in bytes but X protocol measures in 32-bit words.  # 64-bit issue
                                            me.done_header        :=  TRUE;
                                            me.saved_bytevectors  :=  [ packet ];                                               # Packet is incomplete:  wait for rest of it to be read.
                                        fi;
                                    };

                            k =>    {   imports.xpacket_sink.put_value { code, packet };                                        # event or error. Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).
                                        #
                                        reset_packetreader_state ();                                                            # Set up to read next packet.
                                    };
                        esac;
                    };

                fun do_bytevector' { new_bytevector, done_header => FALSE, still_to_read => 0, old_bytevectors => [] }          # Packet header is complete:  Go process it.
                        =>
                        handle_packet_header  new_bytevector;

                    do_bytevector' { new_bytevector, done_header => FALSE, still_to_read => 0, old_bytevectors }                # Packet header is complete:  Go process it.
                        =>
                        handle_packet_header  (v1u::cat (list::reverse (new_bytevector ! old_bytevectors)));

                    do_bytevector' { new_bytevector, done_header => FALSE, still_to_read, old_bytevectors }                     # Packet header is incomplete:  Note addition to it, wait for rest to arrive.
                        =>
                        {   me.saved_bytevectors  :=  new_bytevector ! old_bytevectors;
                            me.bytes_left_to_read :=  still_to_read;
                        };

                    do_bytevector' { new_bytevector, done_header => TRUE, still_to_read => 0, old_bytevectors }                 # Packet is complete (both header and extra words have been read)
                        =>                                                                                                      # so send it along to next processing step (xsequencer-ximp.pkg).
                        {   packet =   v1u::cat (list::reverse (new_bytevector ! old_bytevectors));
                            code   =   v1u::get (packet, 0);                                                                    # Has to be 0u1 -- only replies have post-header bytes.

                            imports.xpacket_sink.put_value { code, packet };                                                    # Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).

                            reset_packetreader_state ();                                                                        # Set up to read next packet.
                        };

                    do_bytevector' { new_bytevector, done_header => TRUE, still_to_read, old_bytevectors }                      # Packet is incomplete:  Note addition to it, wait for remaining extra words to arrive.
                        =>
                        {   me.saved_bytevectors  :=  new_bytevector ! old_bytevectors;
                            me.bytes_left_to_read :=  still_to_read;
                        };
                end;

                fun do_bytevector  new_bytevector
                    =
                    {   bytecount =   v1u::length  new_bytevector; 
                        #
                        if (bytecount == 0)
                            msg = "socket closed -- inbuf-ximp.pkg: read_vector()";
                            log::fatal msg;
                            raise exception DIE msg;                                                                            # We need a more graceful way to signal that the socket has closed.  XXX SUCKO FIXME
                        fi;

                        do_bytevector'  { new_bytevector,
                                          done_header     =>  *me.done_header,
                                          still_to_read   =>  *me.bytes_left_to_read - bytecount,
                                          old_bytevectors =>  *me.saved_bytevectors
                                        };
                    };

                fun loop ()
                    =
                    {
                        do_one_mailop
                          [
                            end_gun' ==>  shut_down_inbuf_imp',

                            soc::receive_vektor' (socket, *me.bytes_left_to_read)
                                ==>
                                do_bytevector
                          ];

                        loop ();
                    };
            end;                                                                                                        # fun run

        fun startup   (reply_oneshot:  Oneshot_Maildrop( (Me_Slot(X), Exports) ))   ()                                  # Root fn of imp microthread.  Note currying.
            =
            {   me_slot =  make_mailslot  ()    :  Me_Slot(X);
                #
                to          =  make_replyqueue();

                put_in_oneshot (reply_oneshot, (me_slot, { }));                                                         # Return value from inbuf_egg'().

                (take_from_mailslot  me_slot)                                                                           # Imports from inbuf_egg'().
                    ->
                    { me, imports, run_gun', end_gun', socket };

                block_until_mailop_fires  run_gun';                                                                     # Wait for the starting gun.

                run { me, imports, to, end_gun', socket };                                                              # Will not return.
            };

        fun process_options (options: List(Option), { name })
            =
            {   my_name   = REF name;
                #
                apply  do_option  options
                where
                    fun do_option (MICROTHREAD_NAME n)  =   my_name := n;
                end;

                { name => *my_name };
            };

        ##########################################################################################
        # PUBLIC.
        #
        fun make_inbuf_egg                                                                                              # PUBLIC. PHASE 1: Construct our state and initialize from 'options'.
              (
                socket:         sok::Socket (X, sok::Stream(sok::Active)),                                              # Socket to read.
                options:        List(Option))
            =
            {   (process_options (options, { name => "inbuf" }))
                    ->
                    { name };

                me =    { bytes_left_to_read =>  REF std_packet_size,
                          done_header        =>  REF FALSE,
                          saved_bytevectors  =>  REF []
                        };


                \\ () = {   reply_oneshot = make_oneshot_maildrop():  Oneshot_Maildrop( (Me_Slot(X), Exports) );        # PUBLIC. PHASE 2: Start our microthread and return our Exports to caller.
                            #
                            xlogger::make_thread  name  (startup  reply_oneshot);                                       # Note that startup() is curried.

                            (get_from_oneshot  reply_oneshot) -> (me_slot, exports);

                            fun phase3                                                                                  # PUBLIC. PHASE 3: Accept our Imports, then wait for Run_Gun to fire.
                                (
                                  imports:      Imports,
                                  run_gun':     Run_Gun,        
                                  end_gun':     End_Gun
                                )
                                =
                                {
                                    put_in_mailslot  (me_slot, { me, imports, run_gun', end_gun', socket });
                                };

                            (exports, phase3);
                        };
            };

    };                                          # package inbuf_ximp
end;


#                       # Tracelogging version of get_packet:
#                       #
#                       get_xpacket =  {.   (get_xpacket ())
#                                           ->
#                                           (result as { code, packet } );
#
#                                       xlogger::log_if xlogger::io_logging  0
#                                          {.   prefix_to_show
#                                                   =
#                                                   byte::unpack_string_vector
#                                                       (vector_slice_of_one_byte_unts::make_slice (packet, 0, max_chars_to_trace_per_read));
#
#
#                                               case max_chars_to_trace_per_read
#                                                   #
#                                                   THE n =>    cat [ "Read from X server: code=", one_byte_unt::to_string code,
#                                                                     "  len=", int::to_string (v1u::length packet),
#                                                                     "  body=",                string_to_hex    prefix_to_show,
#                                                                     "... == \"",              string_to_ascii  prefix_to_show,
#                                                                     "\"..."
#                                                                   ];
#
#                                                   NULL =>     cat [ "Read from X server: code=", one_byte_unt::to_string code,
#                                                                     "  len=", int::to_string (v1u::length packet),
#                                                                     "  body=",                string_to_hex    prefix_to_show,
#                                                                     " == \"",                 string_to_ascii  prefix_to_show,
#                                                                     "\""
#                                                                   ];
#                                               esac;
#                                       };
#
#                                       result;
#                                   };



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext