## outbuf-ximp.pkg
#
# For the big picture see the imp dataflow diagrams in
#
#
src/lib/x-kit/xclient/src/window/xclient-ximps.pkg# Compiled by:
#
src/lib/x-kit/xclient/xclient-internals.sublib # xevent_types is from
src/lib/x-kit/xclient/src/wire/xevent-types.pkg # xerrors is from
src/lib/x-kit/xclient/src/wire/xerrors.pkgstipulate
include package threadkit; # threadkit is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg #
#
package op = xsequencer_to_outbuf; # xsequencer_to_outbuf is from
src/lib/x-kit/xclient/src/wire/xsequencer-to-outbuf.pkg package skj = socket_junk; # socket_junk is from
src/lib/internet/socket-junk.pkg package sok = socket__premicrothread; # socket__premicrothread is from
src/lib/std/socket--premicrothread.pkg package v1u = vector_of_one_byte_unts; # vector_of_one_byte_unts is from
src/lib/std/src/vector-of-one-byte-unts.pkg package iox = io_exceptions; # io_exceptions is from
src/lib/std/src/io/io-exceptions.pkg #
package xtr = xlogger; # xlogger is from
src/lib/x-kit/xclient/src/stuff/xlogger.pkg #
trace = xtr::log_if xtr::io_logging 0; # Conditionally write strings to tracing.log or whatever.
#
fun exception_message (iox::IO { cause, op, name } )
=>
{ cause_message
=
case cause
#
iox::BLOCKING_IO_NOT_SUPPORTED => [", blocking I/O not supported"];
iox::RANDOM_ACCESS_IO_NOT_SUPPORTED => [", random access not supported"];
iox::TERMINATED_INPUT_STREAM => [", terminated input stream"];
iox::CLOSED_IO_STREAM => [", closed stream"];
_ => [" with exception ", exception_message cause];
esac;
cat ("Io: " ! op ! " failed on \"" ! name ! "\"" ! cause_message);
};
exception_message (DIE s) => "DIE: " + s;
exception_message BIND => "nonexhaustive naming failure"; # NOTE: we should probably include line/file info for MATCH and BIND XXX BUGGO FIXME
exception_message MATCH => "nonexhaustive match failure";
exception_message INDEX_OUT_OF_BOUNDS => "index out of bounds";
exception_message SIZE => "size";
exception_message OVERFLOW => "overflow";
exception_message DIVIDE_BY_ZERO => "divide by zero";
exception_message DOMAIN => "domain error";
exception_message e => "unknown";
end;
# Convert "abc" -> "61.62.63." etc:
#
fun string_to_hex s
=
string::translate
(\\ c = number_string::pad_left '0' 2 (int::format number_string::HEX (char::to_int c)) + ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_hex bytes
=
string_to_hex (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
# Show printing chars verbatim, everything
# else as '.', per hexdump tradition:
#
fun string_to_ascii s
=
string::translate
(\\ c = char::is_print c ?? string::from_char c :: ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_ascii bytes
=
string_to_ascii (byte::unpack_string_vector (vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
max_chars_to_trace_per_send = THE 10000;
fun out_vector_to_string v
=
{
foo = vector_slice_of_one_byte_unts::make_full_slice v
except
x = {
log::note_on_stderr {. "out_vector_to_string/AAA000 -- outbuf-ximp.pkg\n"; };
exception_msg = exception_message x;
log::note_on_stderr {. "out_vector_to_string/AAA111 " + exception_msg + " -- outbuf-ximp.pkg\n"; };
raise exception x;
};
prefix_to_show
=
byte::unpack_string_vector
(
foo
);
case max_chars_to_trace_per_send
#
THE n => {
as_hex = string_to_hex prefix_to_show;
as_ascii = string_to_ascii prefix_to_show;
len = (v1u::length v);
len = int::to_string len;
cat [ "Sent to X server: ", as_hex,
"... == \"", as_ascii,
"\"... (", len, " bytes -- outbuf-ximp.pkg)\n"
];
};
NULL => {
cat [ "Sent to X server: ", string_to_hex prefix_to_show,
" == \"", string_to_ascii prefix_to_show,
"\" (", int::to_string (v1u::length v), " bytes -- outbuf-ximp.pkg)\n"
];
};
esac;
};
herein
# This imp is typically instantiated by:
#
#
src/lib/x-kit/xclient/src/wire/xsocket-ximps.pkg package outbuf_ximp
: (weak) Outbuf_Ximp # Outbuf_Ximp is from
src/lib/x-kit/xclient/src/wire/outbuf-ximp.api {
Outbuf_State # Holds all nonephemeral mutable state maintained by ximp.
=
Ref( Void );
Imports = { }; # PUBLIC. Ports we use, provided by other imps.
Me_Slot(X)
=
Mailslot( { imports: Imports,
me: Outbuf_State,
run_gun': Run_Gun,
end_gun': End_Gun,
socket: sok::Socket (X, sok::Stream(sok::Active)) # Socket to read.
}
);
# NB: We've eliminated the Foo_Plea type from other imps in favor of just passing
# Runstate -> Void thunks through the mailqueue, but we've retained them here
# because we want to batch-process our message-queue contents, which requires
# that we have more visibility into the queue contents than allowed by thunks.
Outbuf_Plea = SEND_BYTEVECTOR v1u::Vector #
| SEND_BYTEVECTORS List( v1u::Vector )
| FLUSH (Void -> Void)
;
Outbuf_Q = Mailqueue( Outbuf_Plea );
Exports = { # PUBLIC.
xsequencer_to_outbuf: op::Xsequencer_To_Outbuf
};
Option = MICROTHREAD_NAME String; # PUBLIC.
Outbuf_Egg = Void -> (Exports, (Imports, Run_Gun, End_Gun) -> Void); # PUBLIC.
fun run { # These values will be statically globally visible throughout the code body for the imp.
me: Outbuf_State, #
imports: Imports, # Ximps to which we send requests.
to: Replyqueue, # The name makes foo::pass_something(imp) to {. ... } syntax read well.
end_gun': End_Gun, # We shut down the microthread when this fires.
outbuf_q: Outbuf_Q, #
socket: sok::Socket (X, sok::Stream(sok::Active)) # Socket to read.
}
=
{
loop ();
}
where
fun loop () # Outer loop for the imp.
=
{
do_one_mailop' to [
#
(end_gun' ==> shut_down_outbuf_imp'),
(take_all_from_mailqueue' outbuf_q ==> do_outbuf_pleas)
];
loop ();
}
where
fun shut_down_outbuf_imp' ()
=
{
# sok::close socket;
#
# Reppy closed the socket here.
#
# I do NOT close the socket here because I want
# to be able to kill off and recreate impnets
# freely without affecting the xserver session.
#
# I think of outbuf as USING the socket but not
# OWNING it. We get handed the socket already
# opened by some external agent, and it is up to
# that external agent to close the socket when/if
# it wants the socket closed.
thread_exit { success => TRUE }; # Will not return.
};
fun do_outbuf_pleas []
=>
();
do_outbuf_pleas [ SEND_BYTEVECTOR bytevector ]
=>
skj::send_vector (socket, bytevector);
do_outbuf_pleas [ SEND_BYTEVECTORS bytevectors ]
=>
skj::send_vector (socket, (v1u::cat bytevectors));
do_outbuf_pleas (FLUSH signal_fn ! rest)
=>
{
signal_fn (); # An upstream caller can use this to verify that all output to given point has been flushed beyond outbuf.
do_outbuf_pleas rest;
};
do_outbuf_pleas pleas
=>
{
(scan_pleas (pleas, [])) -> (rest, vectors);
skj::send_vector (socket, (v1u::cat vectors)); # Not actually a blocking call, provided that I/O redirection is in effect. (Which it will be, except briefly early in bootstrap.)
do_outbuf_pleas rest;
}
where
# Find longest prefix consisting of SEND_BYTEVECTOR[S]
# and return all the vectors from it, plus the
# 'rest' of the pleas-list:
#
fun scan_pleas ([], vectors)
=>
([], reverse vectors);
scan_pleas (SEND_BYTEVECTOR vector ! rest, vectors)
=>
scan_pleas (rest, vector ! vectors);
scan_pleas (SEND_BYTEVECTORS vectors' ! rest, vectors)
=>
scan_pleas (rest, (reverse vectors') @ vectors);
scan_pleas (rest as (FLUSH _ ! _), vectors)
=>
(rest, reverse vectors);
end;
end;
end;
end; # fun loop
end; # fun run
fun startup (reply_oneshot: Oneshot_Maildrop( (Me_Slot(X), Exports) )) () # Root fn of imp microthread. Note currying.
=
{ me_slot = make_mailslot () : Me_Slot(X);
#
xsequencer_to_outbuf = { put_value, put_values, flush_outbuf };
to = make_replyqueue();
put_in_oneshot (reply_oneshot, (me_slot, { xsequencer_to_outbuf })); # Return value from outbuf_egg'().
(take_from_mailslot me_slot) # Imports from outbuf_egg'().
->
{ me, imports, run_gun', end_gun', socket };
block_until_mailop_fires run_gun'; # Wait for the starting gun.
run { me, outbuf_q, imports, to, end_gun', socket }; # Will not return.
}
where
outbuf_q = make_mailqueue (get_current_microthread()) : Outbuf_Q;
fun put_value (vector: v1u::Vector) # PUBLIC.
=
put_in_mailqueue (outbuf_q, SEND_BYTEVECTOR vector);
fun put_values (vectors: List(v1u::Vector)) # PUBLIC.
=
put_in_mailqueue (outbuf_q, SEND_BYTEVECTORS vectors);
fun flush_outbuf (signal_fn: Void -> Void) # PUBLIC.
=
put_in_mailqueue (outbuf_q, FLUSH signal_fn);
end;
fun process_options (options: List(Option), { name })
=
{ my_name = REF name;
#
apply do_option options
where
fun do_option (MICROTHREAD_NAME n) = my_name := n;
end;
{ name => *my_name };
};
##########################################################################################
# PUBLIC.
#
fun make_outbuf_egg # PUBLIC. PHASE 1: Construct our state and initialize from 'options'.
( socket: sok::Socket (X, sok::Stream(sok::Active)),
options: List(Option)
)
=
{ (process_options (options, { name => "outbuf" }))
->
{ name };
me = REF ();
\\ () = { reply_oneshot = make_oneshot_maildrop() : Oneshot_Maildrop( (Me_Slot(X), Exports) ); # PUBLIC. PHASE 2: Start our microthread and return our Exports to caller.
#
xlogger::make_thread name (startup reply_oneshot); # Note that startup() is curried.
(get_from_oneshot reply_oneshot) -> (me_slot, exports);
fun phase3 # PUBLIC. PHASE 3: Accept our Imports, then wait for Run_Gun to fire.
( imports: Imports,
run_gun': Run_Gun,
end_gun': End_Gun
)
=
{
put_in_mailslot (me_slot, { me, imports, run_gun', end_gun', socket });
};
(exports, phase3);
};
};
}; # package outbuf_ximp
end;
# fun out_msg_to_string FLUSH_OUTBUF
# =>
# "OutFlush";
#
# out_msg_to_string SHUT_DOWN_OUTBUF
# =>
# "OutQuit";
#
# out_msg_to_string (ADD_TO_OUTBUF v)
# =>
# { prefix_to_show
# =
# byte::unpack_string_vector
# (vector_slice_of_one_byte_unts::make_slice
# (v, 0, max_chars_to_trace_per_send)
# );
#
# case max_chars_to_trace_per_send
# #
# THE n => cat [ "Sent to X server: ", string_to_hex prefix_to_show,
# "... == \"", string_to_ascii prefix_to_show,
# "\"... (", int::to_string (v1u::length v), " bytes)"
# ];
#
# NULL => cat [ "Sent to X server: ", string_to_hex prefix_to_show,
# " == \"", string_to_ascii prefix_to_show,
# "\" (", int::to_string (v1u::length v), " bytes)"
# ];
# esac;
# };
# end;