


## xsocket.pkg
#
# Manage binary socket I/O to an X server for an X client.
#
# Motivation
# ==========
#
# From a coding point of view, the most natural form of
# network client-server interaction is the "remote procedure
# call" (RPC): The client process fires off a network packet
# with a request and then waits for the reply packet from the
# server process.
#
# Unfortunately, the RPC approach can easily slow computation
# to a relative crawl: Memory access times on contemporary
# hardware are measured in nanoseconds, but network round-trip
# times are measured in millions of nanoseconds. Even on today's
# speedy hardware, slowing down computation by a factor of a
# million turns snappy apps into dogs.
#
# Consequently the X Protocol for network communication
# between X client and X server is designed to eliminate
# network round trips where possible and overlap them the
# rest of the time: Instead of firing off a request and
# then sitting idly waiting, the application continues
# to fire off requests, handling the server replies later
# as they trickle back in.
#
# The downside of this approach is that a good deal more
# bookkeeping is required on the client side; a log of
# replies expected but not yet received must be maintained,
# along with a record of what to do with each reply when it
# does arrive, and perhaps what to do if the X server returns
# an error message instead of the expected reply. We must
# also handle the case where no reply at all arrives.
# (Networks are unreliable!)
#
# Our core task in xsocket.pkg is to handle this bookkeeping.
# Higher-level application code sends us requests together
# with code to handle the eventual server replies and/or
# error messages, and we take care of firing off the requests,
# logging the reply-handling code, matching up X server replies
# with logged reply handlers, invoking those handlers, and
# logging error conditions such as lost replies.
#
#
#
# Details
# =======
#
# This code implements the low-level I/O of the X-protocol,
# sending and receiving actual bytestrings from the socket
# connected to the X server.
#
# This includes batching multiple outgoing requests
# per network packet, breaking the incoming bytestream
# into individual replies, matching replies to outstanding
# requests, and collapsing multiple expose events into
# single messages for ease of later processing.
#
# We do not here handle the work of actually encoding and
# decoding wire-format binary bytestrings; those tasks
# are handled by
#
# src/lib/x-kit/xclient/pkg/wire/value-to-wire.pkg# src/lib/x-kit/xclient/pkg/wire/wire-to-value.pkg#
# Together with those two packages, xsocket.pkg
# provides the X server communication layer used by the
# various x-kit imps (server threads) such as:
#
# src/lib/x-kit/xclient/pkg/wire/display.pkg# src/lib/x-kit/xclient/pkg/wire/sendevent-to-wire.pkg#
# src/lib/x-kit/xclient/pkg/window/color-spec.pkg# src/lib/x-kit/xclient/pkg/window/cursors.pkg# src/lib/x-kit/xclient/pkg/window/xsession.pkg# src/lib/x-kit/xclient/pkg/window/draw-imp.pkg# src/lib/x-kit/xclient/pkg/window/font-imp.pkg# src/lib/x-kit/xclient/pkg/window/pen-to-gcontext-imp.pkg# src/lib/x-kit/xclient/pkg/window/cs-pixmap.pkg# src/lib/x-kit/xclient/pkg/window/keymap-imp.pkg# src/lib/x-kit/xclient/pkg/window/rw-pixmap.pkg# src/lib/x-kit/xclient/pkg/window/selection-imp.pkg# src/lib/x-kit/xclient/pkg/window/window.pkg#
# src/lib/x-kit/xclient/pkg/iccc/atom-imp.pkg# src/lib/x-kit/xclient/pkg/iccc/window-property.pkg# src/lib/x-kit/xclient/pkg/iccc/atom.pkg#
#
# In this file we implement the xsession inbuf,
# outbuf, sequencer and xbuf imps. Client code
# interacts with us mainly by using one of the
# sequencer-imp entrypoints
#
# send_xrequest
# send_xrequest_and_verify_success
# send_xrequest_and_read_reply
# sent_xrequest_and_read_replies
# send_xrequest_and_handle_exposures
#
# query_best_size
# query_colors
# query_font
# query_pointer
# query_tree
# query_text_extents
#
# to submit a request or query to the X server.
#
# For the big picture see the imp dataflow diagram in
#
# src/lib/x-kit/xclient/pkg/window/xsession.pkg#
# NOTE: the implementation of 'close' doesn't really work,
# since the socket may end up being closed before the
# output buffer is actually flushed (race condition). XXX BUGGO FIXME
# Compiled by:
# src/lib/x-kit/xclient/xclient-internals.sublib# Compiled by:
# src/lib/x-kit/xclient/xclient-internals.sublib### "The X server has to be the biggest program
### I've ever seen that doesn't do anything for you."
###
### -- Ken Thompson
# event_types is from src/lib/x-kit/xclient/pkg/wire/event-types.pkg # xerrors is from src/lib/x-kit/xclient/pkg/wire/xerrors.pkgstipulate
include threadkit; # threadkit is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg #
package un = unt; # unt is from src/lib/std/unt.pkg# package wv8 = rw_vector_of_one_byte_unts; # rw_vector_of_one_byte_unts is from src/lib/std/src/rw-vector-of-one-byte-unts.pkg package psx = posix_1003_1b; # posix_1003_1b is from src/lib/std/src/posix-1003.1b/posix-1003-1b.pkg package e2s = xerror_to_string; # xerror_to_string is from src/lib/x-kit/xclient/pkg/to-string/xerror-to-string.pkg package v8 = vector_of_one_byte_unts; # vector_of_one_byte_unts is from src/lib/std/src/vector-of-one-byte-unts.pkg package rse = retry_syscall_on_eintr; # retry_syscall_on_eintr is from src/lib/std/src/threadkit/posix/retry-syscall-on-eintr.pkg package v2w = value_to_wire; # value_to_wire is from src/lib/x-kit/xclient/pkg/wire/value-to-wire.pkg package w2v = wire_to_value; # wire_to_value is from src/lib/x-kit/xclient/pkg/wire/wire-to-value.pkg #
package xg = xgeometry; # xgeometry is from src/lib/std/2d/xgeometry.pkg package xtr = xlogger; # xlogger is from src/lib/x-kit/xclient/pkg/stuff/xlogger.pkg #
trace = xtr::log_if xtr::io_logging; # Conditionally write strings to tracing.log or whatever.
herein
package xsocket
: (weak) Xsocket # Xsocket is from src/lib/x-kit/xclient/pkg/wire/xsocket.api {
exception LOST_REPLY;
exception ERROR_REPLY xerrors::Xerror;
max_bytes_per_socket_write = 2048;
# Client pleas to sequencer: by clients
#
Plea_Mail
= PLEA_FLUSH
| PLEA_QUIT
| PLEA v8::Vector
| PLEA_AND_CHECK (v8::Vector, Mailslot(Reply_Mail))
| PLEA_REPLY (v8::Vector, Mailslot(Reply_Mail))
| PLEA_REPLIES (v8::Vector, Mailslot(Reply_Mail), (v8::Vector -> Int))
| PLEA_EXPOSURES (v8::Vector, Oneshot_Maildrop (Void -> List(xg::Box) ))
# Replies from the sequencer to client requests
#
also
Reply_Mail
= REPLY_LOST # The reply was lost somewhere in transit.
| REPLY v8::Vector # A normal reply.
| REPLY_ERROR v8::Vector # The server returned an error message.
;
# Messages from the sequencer to the output buffer
#
Outbuf_Mail
= FLUSH_OUTBUF # Write buffer contents to X server socket.
| SHUT_DOWN_OUTBUF # Shut down.
| ADD_TO_OUTBUF v8::Vector # Add bytestring to buffer.
;
Xsocket
=
XSOCKET
{
xsocket_id: Ref( Void ), # The usual trick -- REFs are equal to themselves and
# nothing else, hence work fine as unique identifiers.
xevent_slot: Mailslot( event_types::x::Event ),
plea_slot: Mailslot( Plea_Mail ),
xerror_slot: Mailslot( (un::Unt, v8::Vector) ),
flush: Void -> Void,
close: Void -> Void
};
empty_v
=
v8::from_list [];
# Time to wait before flushing a non-empty output buffer
#
flush_time_out'
=
timeout_in' (time::from_milliseconds 50);
# +DEBUG
max_chars_to_trace_per_send
=
NULL; # Show complete message.
# THE 4; # First four bytes -- this is what Reppy had.
max_chars_to_trace_per_read
=
NULL; # Show complete message.
# THE 8; # First eight bytes -- this is what Reppy had.
fun new_buf size
=
rw_vector_of_one_byte_unts::make_rw_vector (size, 0u0);
# Convert "abc" -> "61.62.63." etc:
#
fun string_to_hex s
=
string::translate
(fn c = number_string::pad_left '0' 2 (int::format number_string::HEX (char::to_int c)) + ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_hex bytes
=
string_to_hex (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
# Show printing chars verbatim, everything
# else as '.', per hexdump tradition:
#
fun string_to_ascii s
=
string::translate
(fn c = char::is_print c ?? string::from_char c :: ".")
s;
# As above, starting with byte-vector:
#
fun bytes_to_ascii bytes
=
string_to_ascii (byte::unpack_string_vector(vector_slice_of_one_byte_unts::make_slice (bytes, 0, NULL)));
fun out_msg_to_string FLUSH_OUTBUF
=>
"OutFlush";
out_msg_to_string SHUT_DOWN_OUTBUF
=>
"OutQuit";
out_msg_to_string (ADD_TO_OUTBUF v)
=>
{ prefix_to_show
=
byte::unpack_string_vector
(vector_slice_of_one_byte_unts::make_slice
(v, 0, max_chars_to_trace_per_send)
);
case max_chars_to_trace_per_send
#
THE n
=>
cat [ "Sent to X server: ", string_to_hex prefix_to_show,
"... == \"", string_to_ascii prefix_to_show,
"\"... (", int::to_string (v8::length v), " bytes)"
];
NULL
=>
cat [ "Sent to X server: ", string_to_hex prefix_to_show,
" == \"", string_to_ascii prefix_to_show,
"\" (", int::to_string (v8::length v), " bytes)"
];
esac;
};
end;
# -DEBUG
##########################################################################################
# X socket input buffer imp.
#
# Here we monitor the input stream from the X-server
# socket and break it up into individual messages which
# are sent on out_slot to be unmarshalled and routed
# by the sequencer.
#
# We get three kinds of messages from the X server:
#
# o Replies to requests we have sent. Always at least 32 bytes long.
# o Error messages. Always exactly 32 bytes long.
# o Events. Always exactly 32 bytes long.
#
# The first byte of the message distinguishes the three types.
#
# For more details see (e.g.) p1 "1 Protocol Formats" in:
#
# http://mythryl.org/pub/exene/X-protocol-R6.pdf
#
# Our task here is to repetitively read one complete message
# from the X server socket (which on a reply means reading any
# extra databytes) and then forward to the sequencer a pair
#
# (message-bytecode, message-bytes)
#
# where 'message-bytecode' is the first byte from the message
# and message-bytes is the complete message, including bytecode.
#
fun inbuf_imp (out_slot, socket) ()
=
{ std_msg_size = 32;
stipulate
fun is_intr err
=
err == psx::error::intr; # == posix EINTR.
# Read at most 'maxbytes' bytes from
# X server socket without blocking,
# return as a byte-vector.
#
# This needs to be a nonblocking read because
# if we block we lock up the whole GUI:
#
fun receive_vector_nonblocking (arg as (socket, maxbytes))
=
case (socket::receive_vector_nonblocking arg)
#
THE vector # We got some bytes -- return them.
=>
vector;
NULL => # We got EWOULDBLOCK/EAGAIN or EINPROGRESS, so wait and retry.
{ sleep_for (time::from_milliseconds 50); # Wait. (Ugly but servicable -- see note [2] at bottom of file.)
receive_vector_nonblocking arg; # Retry.
};
esac;
# Read exactly n bytes from the X server socket
# and return it as a byte-vector:
#
fun read_vector (bytes_to_read, header)
=
read (bytes_to_read, header)
where
fun read (0, [result_bytevector])
=>
result_bytevector;
read (0, bytevectors)
=>
v8::cat (list::reverse bytevectors);
read (remaining_bytes_to_read, result_bytevectors)
=>
{ bytevector = receive_vector_nonblocking (socket, remaining_bytes_to_read); # See note [1] at bottom of file.
case (v8::length bytevector)
#
0 => raise exception FAIL "Socket closed"; # We need a more graceful way to signal that the socket has closed. XXX BUGGO FIXME
#
bytes_read
=>
read ( remaining_bytes_to_read - bytes_read,
bytevector ! result_bytevectors
);
esac;
};
end;
end;
herein
fun get_msg ()
=
{ msg = read_vector (std_msg_size, []);
case (v8::get (msg, 0))
#
0u1 =>
# Reply -- may need to read additional data bytes.
#
# Byte 0 contains the 'Reply' bytecode (0u1).
#
# Bytes 1-4 contain the number of extra 32-bit words
# of data following the stock 32-byte header.
{
extra_dwords
=
large_unt::to_int_x (pack_big_endian_unt1::get_vec (msg, 1));
{ code => 0u1,
#
msg => (extra_dwords > 0)
?? read_vector (extra_dwords * 4, [msg]) # "* 4" because we measure in bytes but X protocol measures in 32-bit words.
:: msg
};
};
k =>
# Error or event:
#
{ code => k, msg };
esac;
};
end;
# +DEBUG
# Tracelogging version of above:
#
get_msg
=
.{ my (result as { code, msg } )
=
get_msg ();
xlogger::log_if xlogger::io_logging
.{ prefix_to_show
=
byte::unpack_string_vector
(vector_slice_of_one_byte_unts::make_slice (msg, 0, max_chars_to_trace_per_read));
case max_chars_to_trace_per_read
#
THE n
=>
cat [ "Read from X server: code=", one_byte_unt::to_string code,
" len=", int::to_string (v8::length msg),
" body=", string_to_hex prefix_to_show,
"... == \"", string_to_ascii prefix_to_show,
"\"..."
];
NULL
=>
cat [ "Read from X server: code=", one_byte_unt::to_string code,
" len=", int::to_string (v8::length msg),
" body=", string_to_hex prefix_to_show,
" == \"", string_to_ascii prefix_to_show,
"\""
];
esac;
};
result;
};
# -DEBUG
fun loop ()
=
for (;;) {
#
give (out_slot, get_msg());
};
loop ()
except
_ = thread_done ();
};
##########################################################################################
# X socket output buffer imp.
#
# It is more efficient to send a few large
# network packets than many small ones
# (due to ethernet minimum packet sizes,
# per-packet handling overhead etc) so
# here we accumulate multiple X server
# requests per socket write.
#
# We flush our buffer contents to the socket
# after 50 milliseconds or when the buffer
# contents hit 2K, whichever comes first:
#
fun outbuf_imp (in_slot, socket) ()
=
loop ([], 0)
where
fun shut_down_outbuf_imp ()
=
{ socket::close socket;
thread_done ();
};
fun flush_outbuf strings
=
socket_junk::send_vector (socket, v8::cat (reverse strings));
# +DEBUG
# Tracelogging version of above:
#
flush_outbuf
=
fn strs
=
{ xlogger::log_if xlogger::io_logging
.{ cat [
"Flush: ", int::to_string (list::length strs), " msgs, ",
int::to_string (list::fold_left (fn (s, n) = v8::length s + n) 0 strs), " bytes."
];
};
flush_outbuf strs;
};
# -DEBUG
fun add_to_outbuf (string, (strings, bytes_in_buf))
=
{ added_bytes = v8::length string;
if (added_bytes + bytes_in_buf > max_bytes_per_socket_write)
#
flush_outbuf strings;
([string], added_bytes);
else
(string ! strings, added_bytes + bytes_in_buf);
fi;
};
fun print_msg msg
=
{ xlogger::log_if xlogger::io_logging
.{ cat ["outbuf_imp::loop: ", out_msg_to_string msg]; };
msg;
};
fun loop (outbuf, bytes_in_buf)
=
{ xlogger::log_if xlogger::io_logging .{
cat [ "outbuf_imp::loop: waiting ", int::to_string (list::length outbuf)];
};
case outbuf
#
[] =>
# Buffer is empty, so no need to
# flush buffer on timeout; just
# wait for a command:
#
case (print_msg (take in_slot))
#
FLUSH_OUTBUF => loop([], 0); # Buffer empty, so flush is a no-op.
ADD_TO_OUTBUF string => loop([string], v8::length string);
SHUT_DOWN_OUTBUF => shut_down_outbuf_imp ();
esac;
strings =>
# Read and execute command; if no command
# arrives within 50ms, write buffer contents
# to X server socket:
#
select [
#
(take' in_slot ==> print_msg)
==>
fn FLUSH_OUTBUF => { flush_outbuf strings; loop([], 0); };
SHUT_DOWN_OUTBUF => { flush_outbuf strings; shut_down_outbuf_imp(); };
#
ADD_TO_OUTBUF string => loop (add_to_outbuf (string, (outbuf, bytes_in_buf)));
end,
flush_time_out'
==>
(fn _ = { flush_outbuf strings;
loop([], 0);
}
)
];
esac;
};
end; # fun outbuf_imp
##########################################################################################
# Sequencer imp.
#
# The sequencer is responsible for matching
# replies read from the X with requests sent
# to it.
#
# All requests to the X-server go through the sequencer,
# as do all messages from the X-server.
#
# The sequencer communicates on five fixed channels:
#
# plea_slot -- request messages from clients
# from_x_slot -- reply, error and event messages from the X server (via the input buffer)
# to_x_slot -- requests messages to the X server (via the output buffer)
# xevent_slot -- X-events to the X-event buffer (and thence to clients)
# error_sink_slot -- errors to the error handler
#
# In addition, the sequencer sends replies
# to clients on the reply channel that was
# bundled with the request.
#
# We maintain a pending-reply queue of requests sent
# to the X server for which replies are expected but
# not yet received.
# We represent it using the usual two-list arrangement,
# writing new entries to the rear list while reading them
# from the front list; when the front list is empty we
# reverse the rear list and make it the new front list.
#
#
stipulate
Pending_Reply
#
= ERROR_CHECK (un::Unt, Mailslot( Reply_Mail ))
| ONE_REPLY (un::Unt, Mailslot( Reply_Mail ))
| MULTI_REPLY (un::Unt, Mailslot( Reply_Mail ), (v8::Vector -> Int), List( v8::Vector ))
| EXPOSURE_REPLY (un::Unt, Oneshot_Maildrop( Void -> List( xg::Box ) ))
;
#
# The kind of reply that is pending
# for an outstanding request in the
# outstanding request queue.
#
# We use unsigneds to represent the
# sequence numbers.
#
# ONE_REPLY is the workhorse call:
# A request generating a single reply.
#
# MULTI_REPLY is a currently unused call
# supporting multiple responses to a single request:
# we accumulate responses until the (v8::Vector -> Int)
# function argument ("remaining") returns 0.
# (The fourth slot is just the reply accumulator.)
# +DEBUG
fun seqn_to_string n # "seqn" == "sequence number"
=
un::format number_string::DECIMAL n;
fun queue_element_to_string (ERROR_CHECK (n, _)) => " ERROR_CHECK seqn==" + (seqn_to_string n);
queue_element_to_string (ONE_REPLY (n, _)) => " ONE_REPLY seqn==" + (seqn_to_string n);
queue_element_to_string (MULTI_REPLY (n, _, _, _)) => " MULTI_REPLY seqn==" + (seqn_to_string n);
queue_element_to_string (EXPOSURE_REPLY (n, _)) => " EXPOSURE_REPLY seqn==" + (seqn_to_string n);
end;
fun pending_reply_queue_to_string ([], [])
=>
"(pending reply queue is empty)";
pending_reply_queue_to_string (front, rear)
=>
{
fun queue_to_strings ([], l) => reverse l;
queue_to_strings (x ! r, l) => queue_to_strings (r, ((queue_element_to_string x) + "; ") ! l);
end;
"(" + (cat (queue_to_strings (front @ (reverse rear), []))) + ")";
};
end;
# -DEBUG
fun seqn_of (ERROR_CHECK (seqn, _ )) => seqn;
seqn_of (ONE_REPLY (seqn, _ )) => seqn;
seqn_of (MULTI_REPLY (seqn, _, _, _)) => seqn;
seqn_of (EXPOSURE_REPLY (seqn, _ )) => seqn;
end;
# Spawn throw-away thread to deliver
# X server reply to requesting client
# application thread. This avoids
# blocking our own thread until the
# the client thread is ready:
#
fun send_reply arg
=
{
make_thread "xsocket reply" .{ give arg; };
();
};
# Spawn throw-away thread to deliver
# multiple X server replies. This is
# to handle the currently-unused MULTI_REPLY:
#
fun send_replies (slot, replies)
=
{ fun loop []
=>
();
loop (s ! rest)
=>
{ give (slot, REPLY s);
loop rest;
};
end;
make_thread "xsocket replies" .{
#
loop (reverse replies);
};
();
};
fun add_to_pending_reply_queue (pending_reply, (front, rear))
=
# { trace .{ sprintf "xsocket::add_to_pending_reply_queue(%s)/TOP pending_reply_queue = %s" (queue_element_to_string pending_reply) (pending_reply_queue_to_string (front,rear)); }; result =
(front, pending_reply ! rear);
# trace .{ sprintf "xsocket::add_to_pending_reply_queue(%s)/BOT pending_reply_queue = %s" (queue_element_to_string pending_reply) (pending_reply_queue_to_string result); }; result;
# };
# Search pending-reply queue for the
# sequence number n, which is from the
# latest X server message received.
#
# If we have any pending replies with
# lower sequence numbers they must
# correspond to lost X server requests,
# so we do the best we can with them
# and then drop them from the queue.
#
# We return the pair
#
# { found_it, updated_queue }
#
# where:
#
# updated_queue
# is the updated queue.
#
# found_it
# is TRUE iff the head
# of updated_queue has
# sequence number n.
#
fun get_pending_reply_n (n, q)
=
drop_outdated_pending_replies q
where
fun drop_outdated_reply (ERROR_CHECK(_, slot)) => send_reply (slot, REPLY empty_v);
drop_outdated_reply (ONE_REPLY (_, slot)) => send_reply (slot, REPLY_LOST);
drop_outdated_reply (MULTI_REPLY(_, slot, _, [] )) => send_reply (slot, REPLY_LOST);
drop_outdated_reply (MULTI_REPLY(_, slot, _, replies)) => send_replies (slot, replies);
drop_outdated_reply (EXPOSURE_REPLY(_, sync_1shot))
=>
set (sync_1shot, fn () = raise exception LOST_REPLY);
end;
fun drop_outdated_pending_replies (q' as ([], []))
=>
{ found_it => FALSE, updated_queue => q' };
drop_outdated_pending_replies ([], rear)
=>
drop_outdated_pending_replies (reverse rear, []);
drop_outdated_pending_replies (q' as ((pending_reply ! r), rear))
=>
{ seqn = seqn_of pending_reply;
if (seqn < n)
#
drop_outdated_reply pending_reply;
#
drop_outdated_pending_replies (r, rear);
else
seqn > n ?? { found_it => FALSE, updated_queue => q' }
:: { found_it => TRUE, updated_queue => q' };
fi;
};
end;
end;
# Extract the pending-reply queue entry
# with the sequence number n.
#
# If all of the expected X server replies
# for that entry have been received then
# send the extracted reply to the requesting
# client.
#
fun handle_reply_message (seqn, reply, pending_reply_queue)
=
case (get_pending_reply_n (seqn, pending_reply_queue))
#
{ found_it => TRUE, updated_queue => (ONE_REPLY(_, slot) ! r, rear) }
=>
{ send_reply (slot, REPLY reply);
(r, rear);
};
{ found_it => TRUE, updated_queue => (MULTI_REPLY (seqn, slot, remaining, replies) ! rest, rear) }
=>
if (remaining reply == 0)
#
send_replies (slot, reply ! replies);
(rest, rear);
else
( MULTI_REPLY (seqn, slot, remaining, reply ! replies) ! rest,
rear
);
fi;
# xgripe is from src/lib/x-kit/xclient/pkg/stuff/xgripe.pkg _ =>
{ # Debug support:
#
trace .{ sprintf "IMPOSSIBLE ERROR: xsocket::handle_reply_message(seqn==%s, reply x=%s (%d bytes)...): BOGUS PENDING REPLY QUEUE, queue =%s"
(seqn_to_string seqn)
(bytes_to_hex reply)
(v8::length reply)
(pending_reply_queue_to_string pending_reply_queue);
};
xgripe::impossible (sprintf "XERROR: xsocket::handle_reply_message(seqn==%s,...): BOGUS PENDING REPLY QUEUE" (seqn_to_string seqn));
};
esac;
# Extract the pending-reply queue entry
# with seqence number n:
#
fun handle_expose_message (n, reply, pending_reply_queue)
=
{
case (get_pending_reply_n (n, pending_reply_queue))
#
{ found_it => TRUE, updated_queue => (EXPOSURE_REPLY(_, sync_1shot) ! rest, rear) }
=>
{ set (sync_1shot, fn () = reply);
(rest, rear);
};
# For now, just drop it.
# When the gc-server supports graphics-exposures,
# these shouldn't happen: XXX BUGGO FIXME
#
_ =>
{
pending_reply_queue;
};
esac;
/* +DEBUG
(dumpPendingQ (n, q);
xgripe::impossible "ERROR: xsocket::handle_expose_message: bogus pending reply queue]")
-DEBUG */
};
# Extract the pending-reply queue entry
# with seqence number n (corresponding
# to the given error message):
#
fun handle_error_message (n, err, pending_reply_queue)
=
case (get_pending_reply_n (n, pending_reply_queue))
#
{ found_it => TRUE, updated_queue => (ERROR_CHECK(_, slot) ! rest, rear) }
=>
{ send_reply (slot, REPLY_ERROR err);
(rest, rear);
};
{ found_it => TRUE, updated_queue => (ONE_REPLY(_, slot) ! rest, rear) }
=>
{ send_reply (slot, REPLY_ERROR err);
(rest, rear);
};
{ found_it => TRUE, updated_queue => (MULTI_REPLY(_, slot, _, _) ! rest, rear) }
=>
{ send_reply (slot, REPLY_ERROR err);
(rest, rear);
};
{ found_it => TRUE, updated_queue => (EXPOSURE_REPLY(_, sync_1shot) ! rest, rear) }
=>
{ set (sync_1shot, fn () = raise exception ERROR_REPLY (wire_to_value::decode_error err));
(rest, rear);
};
{ found_it => FALSE, updated_queue => pending_reply_queue' }
=>
pending_reply_queue';
_ =>
/* DEBUG */ { trace .{ sprintf "IMPOSSIBLE ERROR: xsocket::handle_error_message(seqn==%s: BOGUS PENDING REPLY QUEUE, queue =%s" (seqn_to_string n) (pending_reply_queue_to_string pending_reply_queue); };
xgripe::impossible "ERROR: xsocket::handle_error_message: bogus pending reply queue]";
/* DEBUG */ };
esac;
fun handle_event_message (n, pending_reply_queue)
=
case (get_pending_reply_n (n, pending_reply_queue))
#
{ found_it => TRUE, updated_queue => (ERROR_CHECK(_, slot) ! rest, rear) }
=>
{ send_reply (slot, REPLY empty_v);
(rest, rear);
};
{ found_it, updated_queue => pending_reply_queue' }
=>
pending_reply_queue';
esac;
herein
fun sequencer_imp
(
plea_slot, # Traffic (requests) from client threads.
#
from_x_slot, # Traffic from X server (via buffer thread).
to_x_slot, # Traffic to X server (via buffer thread).
#
to_xbuf_slot, # Traffic (request replies) to client threads.
#
error_sink_slot # Where we send error messages.
)
()
=
sequencer_imp_main_loop
( 0u0, # Last sequence number read from X server.
0u0, # Last sequence number sent to X server.
#
( [], # Pending-reply queue, front.
[] # Pending-reply queue, back.
)
)
where
fun quit ()
=
{ give (to_x_slot, SHUT_DOWN_OUTBUF);
thread_done ();
};
from_x' = take' from_x_slot;
request' = take' plea_slot;
fun send_request (req, (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{ give (to_x_slot, ADD_TO_OUTBUF req);
#
(last_seqn_read, last_seqn_sent+0u1, pending_reply_queue);
};
fun send_request_and_check ((req, reply_slot), (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{ n = last_seqn_sent+0u1;
#
give (to_x_slot, ADD_TO_OUTBUF req);
#
(last_seqn_read, n, add_to_pending_reply_queue (ERROR_CHECK (n, reply_slot), pending_reply_queue));
};
fun send_request_reply ((req, reply_slot), (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{ n = last_seqn_sent+0u1;
give (to_x_slot, ADD_TO_OUTBUF req);
(last_seqn_read, n, add_to_pending_reply_queue (ONE_REPLY (n, reply_slot), pending_reply_queue));
};
fun send_request_replies ((req, reply_slot, remain), (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{ n = last_seqn_sent+0u1;
give (to_x_slot, ADD_TO_OUTBUF req);
(last_seqn_read, n, add_to_pending_reply_queue (MULTI_REPLY (n, reply_slot, remain, []), pending_reply_queue));
};
fun send_request_exposures ((req, sync_v), (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{ n = last_seqn_sent+0u1;
give (to_x_slot, ADD_TO_OUTBUF req);
(last_seqn_read, n, add_to_pending_reply_queue (EXPOSURE_REPLY (n, sync_v), pending_reply_queue));
};
# Process all pending client-thread
# requests and then flush outbuf:
#
fun do_pending_pleas imp_state
=
{ imp_state = do_all_pending_pleas imp_state;
give (to_x_slot, FLUSH_OUTBUF);
imp_state;
}
where
fun do_all_pending_pleas imp_state
=
case (nonblocking_take plea_slot)
#
THE PLEA_FLUSH => do_all_pending_pleas imp_state;
THE PLEA_QUIT => quit ();
#
THE (PLEA request) => do_all_pending_pleas (send_request (request, imp_state));
THE (PLEA_AND_CHECK request) => do_all_pending_pleas (send_request_and_check (request, imp_state));
#
THE (PLEA_REPLY request) => do_all_pending_pleas (send_request_reply (request, imp_state));
THE (PLEA_REPLIES request) => do_all_pending_pleas (send_request_replies (request, imp_state));
THE (PLEA_EXPOSURES request) => do_all_pending_pleas (send_request_exposures (request, imp_state));
#
NULL => imp_state;
esac;
end;
# This is the main sequencer_imp loop. We track
# the sequence number of the last message in,
# the sequence number of the last message out,
# and the queue of pending replies.
#
fun sequencer_imp_main_loop
#
(imp_state as (last_seqn_read, last_seqn_sent, pending_reply_queue))
=
{
sequencer_imp_main_loop (
#
select [
#
request' ==> do_plea, # Handle request to X server from an application thread.
from_x' ==> do_from_x # Handle reply/error/event from X server.
]
);
}
where
# Handle a request from a client
# (an app thread on our side):
#
fun do_plea PLEA_FLUSH => do_pending_pleas imp_state;
do_plea PLEA_QUIT => quit();
#
do_plea (PLEA_AND_CHECK request) => do_pending_pleas (send_request_and_check (request, imp_state));
do_plea (PLEA_REPLY request) => do_pending_pleas (send_request_reply (request, imp_state));
do_plea (PLEA_REPLIES request) => do_pending_pleas (send_request_replies (request, imp_state));
do_plea (PLEA_EXPOSURES request) => do_pending_pleas (send_request_exposures (request, imp_state));
#
do_plea (PLEA request)
=>
{ give (to_x_slot, ADD_TO_OUTBUF request);
#
(last_seqn_read, last_seqn_sent+0u1, pending_reply_queue);
};
end;
# Handle a message from the X-server
# -- a reply, event or error:
#
fun do_from_x
{
code: one_byte_unt::Unt, # Opcode -- first byte of X server message.
msg # Entire X server message.
}
=
{
# NOTE: this doesn't work if there are 2**17
# outgoing messages between replies/events.
#
# We need to track (last_seqn_sent - last_seqn_read),
# and if it gets bigger than some reasonable size,
# generate a synchronization (i.e., get_input_focus message). XXX BUGGO FIXME
fun get_seq_n ()
=
{ short_seq_n
=
un::from_large_unt (pack_big_endian_unt16::get_vec (msg, 1));
seqn' = un::bitwise_or
( un::bitwise_and (last_seqn_read, un::bitwise_not 0uxffff),
short_seq_n
);
seqn' < last_seqn_read
?? seqn' + 0ux10000 # NOTE: we should check for (seqn' + 0x10000) > lastReqOut XXX BUGGO FIXME
:: seqn';
};
case code
#
0u0 => { # Error message:
seqn = get_seq_n();
give (error_sink_slot, (seqn, msg));
(seqn, last_seqn_sent, handle_error_message (seqn, msg, pending_reply_queue));
};
0u1 => { # Reply message:
seqn = get_seq_n();
(seqn, last_seqn_sent, handle_reply_message (seqn, msg, pending_reply_queue));
};
0u11 => { # KeymapNotify event:
give (to_xbuf_slot, (code, msg));
( last_seqn_read,
last_seqn_sent,
handle_event_message (last_seqn_read, pending_reply_queue)
);
};
0u13 => { # GraphicsExpose event:
seqn = get_seq_n();
include event_types;
boxes = read_expose_event_train ([], wire_to_value::decode_graphics_expose msg)
where
# The X server sends numbered trains of expose events;
# read a complete train and return it as a list:
#
fun read_expose_event_train (result_list, x::GRAPHICS_EXPOSE { box, count=>0, ... } )
=>
box ! result_list; # DONE.
read_expose_event_train (result_list, x::GRAPHICS_EXPOSE { box, ... } )
=>
case (take from_x_slot) # Read next expose event, add to result list.
#
{ code => 0u13, msg=>s }
=>
read_expose_event_train (box ! result_list, wire_to_value::decode_graphics_expose s);
_ =>
{ xgripe::warning "[xok::sequencer_imp: misleading GraphicsExpose count]";
box ! result_list;
};
esac;
read_expose_event_train _ => raise exception FAIL "Bug: Unsupported case in read_expose_event_train.";
end;
end;
( seqn,
last_seqn_sent,
handle_expose_message (seqn, boxes, pending_reply_queue)
);
};
0u14 => { # NoExpose event:
seqn = get_seq_n();
(seqn, last_seqn_sent, handle_expose_message (seqn, [], pending_reply_queue));
};
_ => { # Other event messages:
seqn = get_seq_n();
give (to_xbuf_slot, (code, msg));
(seqn, last_seqn_sent, handle_event_message (seqn, pending_reply_queue));
};
esac;
}; # fun do_from_x
end; # fun sequencer_imp_main_loop
end; # fun sequencer_imp
end; # stipulate
##########################################################################################
# X event buffer imp.
#
# Here we decode and buffer X events
# from the X-server -- keystrokes,
# mouseclicks, mouse-motions etc.
#
# We also pack expose events.
#
# We communicate via two mailslots as follows:
#
# from_sequencer_slot -- raw messages from the sequencer_imp
# to_widget_slot -- decoded events headed for the appropriate widget.
#
# X events that we send to 'to_widget_slot' get routed by
# xsocket_to_topwindow
# from
# src/lib/x-kit/xclient/pkg/window/xsocket-to-topwindow-router.pkg #
# to the correct topwindow, where they get routed on down that window's widget-tree by
# topwindow_to_widget_router
# from
# src/lib/x-kit/xclient/pkg/window/topwindow-to-widget-router.pkg #
# This machinery mostly gets wired up in display and xsession from (respectively)
#
# src/lib/x-kit/xclient/pkg/wire/display.pkg # src/lib/x-kit/xclient/pkg/window/xsession.pkg #
# -- see the dataflow diagram in top-of-file comments there.
#
fun xbuf_imp (from_sequencer_slot, to_widget_slot)
=
route_p
where
include event_types;
fun decode (opcode, bytestring)
=
{ my (not_via_sendevent, event)
=
wire_to_value::decode_xevent (opcode, bytestring);
# trace .{ sprintf "%s <=== (fun decode(): opcode x=%x%s)" (xevent_to_string::xevent_name event) (one_byte_unt::to_int opcode) (not_via_sendevent ?? "" :: " -- EVENT GENERATED VIA SendEvent") ; };
event;
};
fun pack_expose_events (e as x::EXPOSE { exposed_window_id, ... } )
=>
x::EXPOSE { exposed_window_id, boxes => pack([], e), count => 0 }
where
fun pack (rl, x::EXPOSE { boxes, count=>0, ... } )
=>
boxes@rl;
pack (rl, x::EXPOSE { boxes, ... } )
=>
pack (boxes @ rl, decode (take from_sequencer_slot));
pack (rl, _)
=>
{ xgripe::warning "[xok::xbuf_imp: misleading Expose count]";
rl;
};
end;
end;
pack_expose_events _ => raise exception FAIL "Bug: Unsupported case: pack_expose_events";
end;
fun do_xevent (msg, q)
=
case (decode msg)
#
(e as x::EXPOSE _)
=>
pack_expose_events e ! q;
e => (e ! q);
esac;
take_xevent' = take' from_sequencer_slot;
fun route_p ()
=
loop ([], [])
where
fun loop ([], [])
=>
loop (do_xevent (take from_sequencer_slot, []), []);
loop ([], rear)
=>
loop (reverse rear, []);
loop (front as (x ! rest), rear)
=>
loop (
select [
#
take_xevent'
==>
(fn mail = (front, do_xevent (mail, rear))),
give' (to_widget_slot, x)
==>
.{ (rest, rear); }
]
);
end;
end;
end; # fun xbuf_imp
##########################################################################################
# Create the threads and internal mailslots
# to manage a connection to the X server:
# inbuf_imp, outbuf_imp, xbuf_imp, sequencer_imp...
#
# We assume that the connection request/reply
# has already been dealt with.
#
# This function is called (only) from open_xdisplay in:
#
# src/lib/x-kit/xclient/pkg/wire/display.pkg #
fun make_xsocket socket
=
{
inbuf_to_sequencer_slot = make_mailslot ();
sequencer_to_outbuf_slot = make_mailslot ();
xbuf_to_client_slot = make_mailslot ();
sequencer_to_xbuf_slot = make_mailslot ();
client_to_sequencer_slot = make_mailslot ();
xerror_slot = make_mailslot ();
# expose_strm = make_mailslot ();
fun flush_fn ()
=
give (client_to_sequencer_slot, PLEA_FLUSH);
fun close_fn ()
=
{ xlogger::log_if xlogger::io_logging .{ "close connection."; };
flush_fn ();
give (client_to_sequencer_slot, PLEA_QUIT);
};
/******
make_thread "xok seq" (sequencer_imp (reqStrm, inStrm, outStrm, xevtMsgStrm, errStrm));
make_thread "xok in" ( inbuf_imp ( inStrm, socket));
make_thread "xok out" (outbuf_imp (outStrm, socket));
make_thread "xok buf" (xeventBuffer (xevtMsgStrm, xevtStrm));
******/
/* DEBUG */
xlogger::make_thread "sequencer_imp" (
sequencer_imp
( client_to_sequencer_slot,
inbuf_to_sequencer_slot,
sequencer_to_outbuf_slot,
sequencer_to_xbuf_slot,
xerror_slot
)
);
xlogger::make_thread "inbuf_imp" (inbuf_imp ( inbuf_to_sequencer_slot, socket));
xlogger::make_thread "outbuf_imp" (outbuf_imp (sequencer_to_outbuf_slot, socket));
xlogger::make_thread "xbuf_imp" (xbuf_imp ( sequencer_to_xbuf_slot, xbuf_to_client_slot));
XSOCKET
{ xsocket_id => REF (),
xevent_slot => xbuf_to_client_slot,
plea_slot => client_to_sequencer_slot,
xerror_slot,
flush => flush_fn,
close => close_fn
};
};
fun close_xsocket (XSOCKET { close, ... } )
=
close ();
fun same_xsocket (XSOCKET { xsocket_id=>a, ... }, XSOCKET { xsocket_id=>b, ... } )
=
a == b;
fun send_xrequest (XSOCKET { plea_slot, ... } ) s
=
{
give (plea_slot, PLEA s);
};
# Reply handling in the Client-thread context.
#
# Most processing happens in the xsocket's
# own threads, but any client-relevant exception
# needs to be raised in the context of the
# calling client thread. That is our job here:
#
fun unwrap_reply REPLY_LOST => raise exception LOST_REPLY;
unwrap_reply (REPLY_ERROR s) => raise exception ERROR_REPLY (wire_to_value::decode_error s);
unwrap_reply (REPLY s) => s;
end;
# NOTE: these should be done using a guard mailop eventually XXX BUGGO FIXME
# This is a workhorse call,
# request-with-single-reply:
#
fun send_xrequest_and_read_reply (XSOCKET { plea_slot, ... } ) s
=
{ reply_slot = make_mailslot ();
give (plea_slot, PLEA_REPLY (s, reply_slot));
take' reply_slot
==>
unwrap_reply;
};
# Generate a request to the server and
# check on its successful completion.
#
# The only uses of this I find are:
#
# property::change_property in
# src/lib/x-kit/xclient/pkg/iccc/window-property.pkg #
# font_imp::open_font in
# src/lib/x-kit/xclient/pkg/window/font-imp.pkg #
# In both cases the idea is to wait for
# successful completion of the op before
# continuing.
#
fun send_xrequest_and_verify_success (XSOCKET { plea_slot, ... } ) s
=
{ reply_slot1 = make_mailslot ();
reply_slot2 = make_mailslot ();
give (plea_slot, PLEA_AND_CHECK (s, reply_slot1));
give (plea_slot, PLEA_REPLY (value_to_wire::request_get_input_focus, reply_slot2));
take' reply_slot1
==>
fn (REPLY_ERROR s) => raise exception ERROR_REPLY (wire_to_value::decode_error s);
_ => ();
end;
};
# I cannot find any code which actually uses this:
#
fun sent_xrequest_and_read_replies
(XSOCKET { plea_slot, ... } )
(s, remain)
=
{ reply_slot = make_mailslot ();
give (plea_slot, PLEA_REPLIES (s, reply_slot, remain));
take' reply_slot
==>
unwrap_reply;
};
# This is directly used exactly once,
# by draw_imp::flush_outbuf, and ultimately
# used twice, by DOP_COPY_AREA and DOP_COPY_PLANE
# in draw_imp:
#
fun send_xrequest_and_handle_exposures
(XSOCKET { plea_slot, ... } )
(s, sync_v)
=
{ reply_slot = make_mailslot (); # Um? This is never used. XXX BUGGO FIXME.
give (plea_slot, PLEA_EXPOSURES (s, sync_v));
};
fun flush (XSOCKET { flush, ... } )
=
flush ();
# Generate a mailop to read X events from X server.
# Called (only) from:
#
# src/lib/x-kit/xclient/pkg/window/xsocket-to-topwindow-router.pkg #
fun wait_for_xevent (XSOCKET { xevent_slot, ... } )
=
take' xevent_slot;
# This is called (only) once, from err_handler() in:
# src/lib/x-kit/xclient/pkg/wire/display.pkg fun read_xerror (XSOCKET { xerror_slot, ... } )
=
take xerror_slot;
################################################################################
# X-server queries
#
# This stuff does not depend on access to xsocket
# internals, so it could just as easily be a separate
# package, but it seems simplest to keep it here for now:
# This is a little convenience function to encode
# a query and then decode the reply. It was originally
# in font_imp, it moved here to avoid replication:
#
fun query (encode, decode) xsocket
=
ask
where
send_xrequest_and_read_reply' = send_xrequest_and_read_reply xsocket;
fun ask msg
=
decode (do_mailop (send_xrequest_and_read_reply' (encode msg)))
except
LOST_REPLY => raise exception (xgripe::XERROR "[reply lost]");
ERROR_REPLY err => raise exception (xgripe::XERROR (e2s::xerror_to_string err));
end;
end;
# Some predefined queries based on the above.
# (Maybe we should predefine them all here?)
#
query_best_size = query (v2w::encode_query_best_size, w2v::decode_query_best_size_reply );
query_colors = query (v2w::encode_query_colors, w2v::decode_query_colors_reply );
query_font = query (v2w::encode_query_font, w2v::decode_query_font_reply );
query_pointer = query (v2w::encode_query_pointer, w2v::decode_query_pointer_reply );
query_tree = query (v2w::encode_query_tree, w2v::decode_query_tree_reply );
query_text_extents = query (v2w::encode_query_text_extents, w2v::decode_query_text_extents_reply);
}; # package xsocket
end;
##########################################################################
# NOTES
#
############
# Note [1]
#
# This
#
# v = receive_vector_nonblocking (socket, n);
#
# was originally
#
# v = socket::receive_vector (socket, n);
#
# which didn't work due to EINTR errors from
# the 50Hz setitimer SIGALRM, propagaged up
# as psx::error::intr exceptions.
#
# I don't know why the SML/NJ version did not have this
# problem. Maybe it did. Maybe it didn't and I broke
# something. Maybe it worked on the original Solaris
# OS and this problem manifests only on Linux.
#
# Changing the original code to
#
# v = rse::do_syscall_retry_on_eintr socket::receive_vector (socket, n);
#
# trapped the EINTR-derived exceptions and retried, but
# that call turns off the 50Hz SIGALRM after a retry or
# two, which shuts down all GUI activity until the next
# X event arrives to unblock things.
#
# Either way, the system would be spending a significant
# fraction of its time read-blocked on the X server socket
# even if there are threads ready to run, which sucks.
#
# The current code does nonblocking reads to avoid losing
# valuable cycles sitting blocked on the socket fd. It
# then does a wait and retry. This does increase system
# user-input response latency by the wait time, which is
# suboptimal.
#
#
#
############
# Note [2]
#
# Spinning like this is pretty ugly, albeit fairly cheap, and hugely
# better than blocking on the socket, since this way other threads can
# run while we're waiting.
#
# It would be much nicer to have a dedicated POSIX thread which did a
# select/poll/whatever on the set of fds we're waiting for input from
# and then sets "input available flags in C-side memory.
#
# Our gc-probe logic could then check those flags and ping the scheduler
# to wake threads waiting on such I/O.
#
# Since gc-probe is the only time we respond to such thread I/O opportunities,
# this is a minimum-latency solution, short of having a Mythryl ppread blocked
# and ready to run -- possible only once we have parallel computing support. XXX BUGGO FIXME.


