## inbuf-ximp.pkg
#
##########################################################################################
# X socket input buffer imp.
#
# For the big picture see the imp dataflow diagrams in
#
#
src/lib/x-kit/xclient/src/window/xclient-ximps.pkg#
# Here we monitor the input stream from the X-server
# socket and break it up into individual messages which
# are sent on out_mailslot to be unmarshalled and routed
# by the sequencer.
#
# We get three kinds of messages from the X server:
#
# o Replies to requests we have sent. Always at least 32 bytes long.
# o Error messages. Always exactly 32 bytes long.
# o Events. Always exactly 32 bytes long.
#
# The first byte of the message distinguishes the three types.
#
# For more details see (e.g.) p1 "1 Protocol Formats" in:
#
# http://mythryl.org/pub/exene/X-protocol-R6.pdf
#
# Our task here is to repetitively read one complete message
# from the X server socket (which on a reply means reading any
# extra databytes) and then forward to the sequencer a pair
#
# (message-bytecode, message-bytes)
#
# where 'message-bytecode' is the first byte from the message
# and message-bytes is the complete message, including bytecode.
# Compiled by:
#
src/lib/x-kit/xclient/xclient-internals.sublib # xevent_types is from
src/lib/x-kit/xclient/src/wire/xevent-types.pkg # xerrors is from
src/lib/x-kit/xclient/src/wire/xerrors.pkgstipulate
include package threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg #
package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
package un = unt; # unt is from
src/lib/std/unt.pkg package uid = issue_unique_id; # issue_unique_id is from
src/lib/src/issue-unique-id.pkg# package wv8 = rw_vector_of_one_byte_unts; # rw_vector_of_one_byte_unts is from
src/lib/std/src/rw-vector-of-one-byte-unts.pkg package psx = posixlib; # posixlib is from
src/lib/std/src/psx/posixlib.pkg package e2s = xerror_to_string; # xerror_to_string is from
src/lib/x-kit/xclient/src/to-string/xerror-to-string.pkg package skj = socket_junk; # socket_junk is from
src/lib/internet/socket-junk.pkg package sok = socket__premicrothread; # socket__premicrothread is from
src/lib/std/socket--premicrothread.pkg package soc = socket; # socket is from
src/lib/std/src/socket/socket.pkg package v1u = vector_of_one_byte_unts; # vector_of_one_byte_unts is from
src/lib/std/src/vector-of-one-byte-unts.pkg package v2w = value_to_wire; # value_to_wire is from
src/lib/x-kit/xclient/src/wire/value-to-wire.pkg package w2v = wire_to_value; # wire_to_value is from
src/lib/x-kit/xclient/src/wire/wire-to-value.pkg package xps = xpacket_sink; # xpacket_sink is from
src/lib/x-kit/xclient/src/wire/xpacket-sink.pkg #
package g2d = geometry2d; # geometry2d is from
src/lib/std/2d/geometry2d.pkg package xtr = xlogger; # xlogger is from
src/lib/x-kit/xclient/src/stuff/xlogger.pkg #
trace = xtr::log_if xtr::io_logging 0; # Conditionally write strings to tracing.log or whatever.
std_packet_size = 32; # Standard size-in-bytes for an X protocol message.
# Convert "abc" -> "61.62.63." etc:
#
fun string_to_hex s
=
string::translate
(\\ c = number_string::pad_left '0' 2 (int::format number_string::HEX (char::to_int c)) + ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_hex bytes
=
string_to_hex (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
# Show printing chars verbatim, everything
# else as '.', per hexdump tradition:
#
fun string_to_ascii s
=
string::translate
(\\ c = char::is_print c ?? string::from_char c :: ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_ascii bytes
=
string_to_ascii (byte::unpack_string_vector (vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
max_chars_to_trace_per_read = THE 10000;
fun in_vector_to_string v
=
{
foo = vector_slice_of_one_byte_unts::make_full_slice v
except
x = {
log::note_on_stderr {. "in_vector_to_string/AAA000 -- inbuf-ximp.pkg\n"; };
exception_msg = exception_message x;
log::note_on_stderr {. "in_vector_to_string/AAA111 " + exception_msg + " -- inbuf-ximp.pkg\n"; };
raise exception x;
};
prefix_to_show
=
byte::unpack_string_vector
(
foo
);
case max_chars_to_trace_per_read
#
THE n => {
as_hex = string_to_hex prefix_to_show;
as_ascii = string_to_ascii prefix_to_show;
len = (v1u::length v);
len = int::to_string len;
cat [ "Read from X server: ", as_hex,
"... == \"", as_ascii,
"\"... (", len, " bytes -- inbuf-ximp.pkg)\n"
];
};
NULL => {
cat [ "Read from X server: ", string_to_hex prefix_to_show,
" == \"", string_to_ascii prefix_to_show,
"\" (", int::to_string (v1u::length v), " bytes -- inbuf-ximp.pkg)\n"
];
};
esac;
};
herein
# This imp is typically instantiated by:
#
#
src/lib/x-kit/xclient/src/wire/xsocket-ximps.pkg package inbuf_ximp
: (weak) Inbuf_Ximp # Inbuf_Ximp is from
src/lib/x-kit/xclient/src/wire/inbuf-ximp.api {
Run_Gun = Mailop(Void); # Purely for readability.
End_Gun = Mailop(Void); # Purely for readability.
Inbuf_Ximp_State # Holds all nonephemeral mutable state maintained by ximp.
=
{ bytes_left_to_read: Ref(Int),
done_header: Ref(Bool),
saved_bytevectors: Ref(List(v1u::Vector))
};
Xpacket = { code: v1u::Element, packet: v1u::Vector }; # message-bytecode, message-bytes.
# code is first byte from message.
# 'packet' is complete message, including code.
Imports = { xpacket_sink: xps::Xpacket_Sink }; # Ports we use, provided by other imps.
Me_Slot(X)
=
Mailslot( { imports: Imports,
me: Inbuf_Ximp_State,
run_gun': Run_Gun,
end_gun': End_Gun,
socket: sok::Socket (X, sok::Stream(sok::Active)) # Socket to read.
}
);
Exports = { }; # Ports we provide for use by other imps.
Option = MICROTHREAD_NAME String; #
Inbuf_Egg = Void -> (Exports, (Imports, Run_Gun, End_Gun) -> Void);
fun run { # These values will be statically globally visible throughout the code body for the imp.
me: Inbuf_Ximp_State, #
imports: Imports, # Ximps to which we send requests.
to: Replyqueue, # The name makes foo::pass_something(imp) to {. ... } syntax read well.
end_gun': End_Gun, # We shut down the microthread when this fires.
socket: sok::Socket (X, sok::Stream(sok::Active)) # Socket to read.
}
=
{
loop ();
}
where
fun shut_down_inbuf_imp' ()
=
thread_exit { success => TRUE }; # Will not return.
fun reset_packetreader_state ()
=
{ me.bytes_left_to_read := std_packet_size;
me.done_header := FALSE;
me.saved_bytevectors := [];
};
fun handle_packet_header packet
=
{ code = v1u::get (packet, 0);
#
case code
#
0u1 => # Reply -- may need to read additional data bytes.
#
# Byte 0 contains the 'Reply' bytecode (0u1).
#
# Bytes 1-4 contain the number of extra 32-bit words
# of data following the stock 32-byte header.
{
extra_dwords = large_unt::to_int_x (pack_big_endian_unt1::get_vec (packet, 1));
if (extra_dwords == 0) # Need to read rest of packet.
#
imports.xpacket_sink.put_value { code, packet }; # Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).
reset_packetreader_state (); # Set up to read next packet.
else
me.bytes_left_to_read := 4 * extra_dwords; # "* 4" because we measure in bytes but X protocol measures in 32-bit words. # 64-bit issue
me.done_header := TRUE;
me.saved_bytevectors := [ packet ]; # Packet is incomplete: wait for rest of it to be read.
fi;
};
k => { imports.xpacket_sink.put_value { code, packet }; # event or error. Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).
#
reset_packetreader_state (); # Set up to read next packet.
};
esac;
};
fun do_bytevector' { new_bytevector, done_header => FALSE, still_to_read => 0, old_bytevectors => [] } # Packet header is complete: Go process it.
=>
handle_packet_header new_bytevector;
do_bytevector' { new_bytevector, done_header => FALSE, still_to_read => 0, old_bytevectors } # Packet header is complete: Go process it.
=>
handle_packet_header (v1u::cat (list::reverse (new_bytevector ! old_bytevectors)));
do_bytevector' { new_bytevector, done_header => FALSE, still_to_read, old_bytevectors } # Packet header is incomplete: Note addition to it, wait for rest to arrive.
=>
{ me.saved_bytevectors := new_bytevector ! old_bytevectors;
me.bytes_left_to_read := still_to_read;
};
do_bytevector' { new_bytevector, done_header => TRUE, still_to_read => 0, old_bytevectors } # Packet is complete (both header and extra words have been read)
=> # so send it along to next processing step (xsequencer-ximp.pkg).
{ packet = v1u::cat (list::reverse (new_bytevector ! old_bytevectors));
code = v1u::get (packet, 0); # Has to be 0u1 -- only replies have post-header bytes.
imports.xpacket_sink.put_value { code, packet }; # Packet is complete, send it along to next processing step (xsequencer-ximp.pkg).
reset_packetreader_state (); # Set up to read next packet.
};
do_bytevector' { new_bytevector, done_header => TRUE, still_to_read, old_bytevectors } # Packet is incomplete: Note addition to it, wait for remaining extra words to arrive.
=>
{ me.saved_bytevectors := new_bytevector ! old_bytevectors;
me.bytes_left_to_read := still_to_read;
};
end;
fun do_bytevector new_bytevector
=
{ bytecount = v1u::length new_bytevector;
#
if (bytecount == 0)
msg = "socket closed -- inbuf-ximp.pkg: read_vector()";
log::fatal msg;
raise exception DIE msg; # We need a more graceful way to signal that the socket has closed. XXX SUCKO FIXME
fi;
do_bytevector' { new_bytevector,
done_header => *me.done_header,
still_to_read => *me.bytes_left_to_read - bytecount,
old_bytevectors => *me.saved_bytevectors
};
};
fun loop ()
=
{
do_one_mailop
[
end_gun' ==> shut_down_inbuf_imp',
soc::receive_vektor' (socket, *me.bytes_left_to_read)
==>
do_bytevector
];
loop ();
};
end; # fun run
fun startup (reply_oneshot: Oneshot_Maildrop( (Me_Slot(X), Exports) )) () # Root fn of imp microthread. Note currying.
=
{ me_slot = make_mailslot () : Me_Slot(X);
#
to = make_replyqueue();
put_in_oneshot (reply_oneshot, (me_slot, { })); # Return value from inbuf_egg'().
(take_from_mailslot me_slot) # Imports from inbuf_egg'().
->
{ me, imports, run_gun', end_gun', socket };
block_until_mailop_fires run_gun'; # Wait for the starting gun.
run { me, imports, to, end_gun', socket }; # Will not return.
};
fun process_options (options: List(Option), { name })
=
{ my_name = REF name;
#
apply do_option options
where
fun do_option (MICROTHREAD_NAME n) = my_name := n;
end;
{ name => *my_name };
};
##########################################################################################
# PUBLIC.
#
fun make_inbuf_egg # PUBLIC. PHASE 1: Construct our state and initialize from 'options'.
(
socket: sok::Socket (X, sok::Stream(sok::Active)), # Socket to read.
options: List(Option))
=
{ (process_options (options, { name => "inbuf" }))
->
{ name };
me = { bytes_left_to_read => REF std_packet_size,
done_header => REF FALSE,
saved_bytevectors => REF []
};
\\ () = { reply_oneshot = make_oneshot_maildrop(): Oneshot_Maildrop( (Me_Slot(X), Exports) ); # PUBLIC. PHASE 2: Start our microthread and return our Exports to caller.
#
xlogger::make_thread name (startup reply_oneshot); # Note that startup() is curried.
(get_from_oneshot reply_oneshot) -> (me_slot, exports);
fun phase3 # PUBLIC. PHASE 3: Accept our Imports, then wait for Run_Gun to fire.
(
imports: Imports,
run_gun': Run_Gun,
end_gun': End_Gun
)
=
{
put_in_mailslot (me_slot, { me, imports, run_gun', end_gun', socket });
};
(exports, phase3);
};
};
}; # package inbuf_ximp
end;
# # Tracelogging version of get_packet:
# #
# get_xpacket = {. (get_xpacket ())
# ->
# (result as { code, packet } );
#
# xlogger::log_if xlogger::io_logging 0
# {. prefix_to_show
# =
# byte::unpack_string_vector
# (vector_slice_of_one_byte_unts::make_slice (packet, 0, max_chars_to_trace_per_read));
#
#
# case max_chars_to_trace_per_read
# #
# THE n => cat [ "Read from X server: code=", one_byte_unt::to_string code,
# " len=", int::to_string (v1u::length packet),
# " body=", string_to_hex prefix_to_show,
# "... == \"", string_to_ascii prefix_to_show,
# "\"..."
# ];
#
# NULL => cat [ "Read from X server: code=", one_byte_unt::to_string code,
# " len=", int::to_string (v1u::length packet),
# " body=", string_to_hex prefix_to_show,
# " == \"", string_to_ascii prefix_to_show,
# "\""
# ];
# esac;
# };
#
# result;
# };