# io-wait-hostthread.pkg
#
# For general background, see "Overview" comments in
#
#
src/lib/std/src/hostthread/io-wait-hostthread.api#
# Our basic task here is to watch a set of file descriptors
# (typically pipes or sockets) and when an I/O opportunity
# arrives (almost always data now available to read) call
# a corresponding client-provided closure to take advantage
# of that opportunity.
#
# An I/O opportunity can be one of three things:
#
# o Input file descriptor now has data available to be read. (The typical case of interest.)
# o Output file descriptor can now accept a data write. (Much rarer -- usually we just to a blocking write.)
# o Out-of-band-data is available to be read. (Vanishingly rare in practice.)
#
# So our main datastructure requirement is to be able to map
#
# (filedescriptor, action)
#
# pairs to corresponding client-provided closures ("functions").
#
# Since fds are short integers and actions can be represented
# in two bits, we fold fd+action into a single int which can
# then be used to index a stock red-black tree.
#
#
# See also:
#
#
src/lib/std/src/hostthread/cpu-bound-task-hostthreads.pkg#
src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg# Compiled by:
#
src/lib/std/standard.libstipulate
package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package hth = hostthread; # hostthread is from
src/lib/std/src/hostthread.pkg package im = int_red_black_map; # int_red_black_map is from
src/lib/src/int-red-black-map.pkg package psx = posixlib; # posixlib is from
src/lib/std/src/psx/posixlib.pkg package tim = time; # time is from
src/lib/std/time.pkg package vu1 = vector_of_one_byte_unts; # vector_of_one_byte_unts is from
src/lib/std/src/vector-of-one-byte-unts.pkg package wio = winix__premicrothread::io; # winix__premicrothread::io is from
src/lib/std/src/posix/winix-io--premicrothread.pkg package wxp = winix__premicrothread::process; # winix__premicrothread::process is from
src/lib/std/src/posix/winix-process--premicrothread.pkgherein
package io_wait_hostthread
: (weak) Io_Wait_Hostthread # Io_Wait_Hostthread is from
src/lib/std/src/hostthread/io-wait-hostthread.api {
#################################################################
# BEGIN type definitions section
Pipe = { infd: psx::File_Descriptor,
outfd: psx::File_Descriptor
};
# One record type for each request
# supported by the server:
#
Do_Stop = { per_who: String, reply: Void -> Void }; # Edit to suit. The 'reply' thunks will typically send a do_something() request back to the originating hostthread.
Do_Echo = { what: String, reply: String -> Void };
Do_Note_Iod_Reader
=
{ io_descriptor: wio::Iod,
read_fn: wio::Iod -> Void # Call this fn (closure) on iod whenever input is available on fd.
};
Do_Note_Iod_Writer
=
{ io_descriptor: wio::Iod,
write_fn: wio::Iod -> Void # Call this fn (closure) on iod whenever output is possible on fd.
};
Do_Note_Iod_Oobder
=
{ io_descriptor: wio::Iod,
oobd_fn: wio::Iod -> Void # Call this fn (closure) on iod whenever out-of-band data ("oobd") is available on this fd.
};
Request = DO_STOP Do_Stop # Union of above record types, so that we can keep them all in one queue.
| DO_ECHO Do_Echo
#
| DO_TEST String
# Nominally temporary debug code; 'String' identifies caller for debug purposes.
;
# END of type definitions section
#################################################################
#################################################################
# BEGIN MUTABLE STATE SECTION
#
pid = REF 0; # pid of current process while server is running, otherwise zero.
server_hostthread_is_running = REF FALSE;
client_fd_count = REF 0; # Number of file descriptors we are watching for clients.
# The "for clients" qualifier is because this count does not include the fd for our private pipe().
# This count is support for is_doing_useful_work ().
client_fns
=
REF (im::empty: im::Map( wio::Iod -> Void )); # When an fd is ready to read, we'll look it up in this and call resulting read_fn on it. (Or similar for writing or out-of-band data.)
private_pipe = REF (NULL: Null_Or(Pipe)); # This is a private pipe we use to wake our server hostthread by sending it a byte.
# Since it spends all of its time sleeping on a wio::wait_for_io_opportunity (C select()/poll())
# call, sending it a byte is the simplest and best way to wake it. (See Note[1].)
wait_requests = REF ([]: List( wio::Ioplea )); # This is the set of file descriptors to watch for I/O opportunities via wio::wait_for_io_opportunity (C select()/poll()).
# INVARIANT: No two entries on list have same Iod_Descriptor value.
# timeout = REF (tim::from_float_seconds 0.02); # Set up to timeout at 50Hz.
# THIS VALUE IS ONLY A TEMPORARY DEBUG HACK:
timeout = REF (tim::from_float_seconds 0.50); # Set up to timeout at 50Hz.
# timeout = REF (tim::from_float_seconds 0.20); # Set up to timeout at 50Hz.
# microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg per_loop_fns = REF ([]: List( Ref(Void -> Void)) ); # Functions to call once each time around the main outer loop.
# See note_per_loop_fn/drop_per_loop_fn comments in
src/lib/std/src/hostthread/io-wait-hostthread.api request_queue = REF ([]: List(Request)); # Queue of pending requests from client hostthreads.
mutex = hth::make_mutex (); # These will actually survive a heap save/load cycle, pretty much.
condvar = hth::make_condvar ();
# END OF MUTABLE STATE SECTION
#################################################################
fun print_wait_requests ({ io_descriptor, readable, writable, oobdable } ! rest) # A little debug-support function, not used in in production code.
=>
{ fd = psx::iod_to_fd (psx::fd_to_int io_descriptor);
#
printf "fd %d readable %B writable %B oobdable %B\n"
fd
readable
writable
oobdable
;
};
print_wait_requests [] => ();
end;
#################################################################
# This section implements our tree mapping (iod+op) -> client_fn:
# Values for our two-bit read/write/oobd field: # "oobd" == "out-of-band data".
#
read_op = 1;
write_op = 2;
oobd_op = 3;
#
fun fdop_to_index (fd: psx::File_Descriptor, op: Int)
=
{ fd' = psx::fd_to_int fd;
#
index = (fd' << 2)
| op;
#
index: Int;
};
#
fun index_to_fdop (index: Int)
=
{ op = (index & 3);
fd = (index >> 2);
#
(fd, op);
};
#
fun is_running ()
=
{
actual_pid = wxp::get_process_id ();
#
hth::acquire_mutex mutex;
#
#
if(*pid != 0 and *pid != actual_pid)
pid := 0;
server_hostthread_is_running := FALSE; # To ensure that if the heap gets dumped to disk and then and reloaded, is_running() will (correctly) return FALSE.
fi;
#
result = *server_hostthread_is_running;
#
hth::release_mutex mutex;
result;
};
#
fun request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *request_queue [] => TRUE; # and Mythryl does not support comparison of thunks for equality.)
_ => FALSE;
esac;
#
fun default_wait_request_list (pipe: Pipe)
=
{ # Our minimal request list is to read
# the pipe that clients use to wake us:
#
io_descriptor = psx::fd_to_iod pipe.infd;
#
[ { io_descriptor,
#
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
];
};
#
fun get_timeout_interval ()
=
*timeout; # See comments in
src/lib/std/src/hostthread/io-wait-hostthread.api #
fun set_timeout_interval time # Set timeout to use for our wio::wait_for_io_opportunity calls (C select()/poll().)
=
{
hth::acquire_mutex mutex; # Using the mutex here does have a point -- it
# # ensures that no mutex-locked critical section will
timeout := time; # see the timeout changing unexpectedly out from under it.
#
hth::release_mutex mutex;
};
# For background on these two see comments in
#
#
src/lib/std/src/hostthread/io-wait-hostthread.api #
# Commented these two out because
# 1) They call external fns while holding the mutex,
# which isn't safe -- could threadswitch and deadlock
# 2) Nobody was calling them anyhow. -- 2012-11-11 CrT
# fun note_per_loop_fn loopfn # Set fn to be called once each time around our outer loop.
# = # This fn is used to drive pre-emptive multi-threading in the
# { # threadkit scheduler:
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg# hth::acquire_mutex mutex;
# #
# if (not (list::in (loopfn, *per_loop_fns))) # Don't add fn to list if it is already in the list.
# #
# per_loop_fns := loopfn ! *per_loop_fns;
# fi;
# #
# hth::release_mutex mutex;
# };
#
# fun drop_per_loop_fn loopfn
# =
# {
# hth::acquire_mutex mutex;
# #
# per_loop_fns := list::drop (loopfn, *per_loop_fns);
# #
# hth::release_mutex mutex;
# };
#
fun do_stop (r: Do_Stop) # Shut down io-wait-hostthread server.
= # Internal fn -- will execute in context of server hostthread.
{
hth::acquire_mutex mutex;
#
pid := 0;
server_hostthread_is_running := FALSE;
private_pipe := NULL;
#
hth::release_mutex mutex;
case *private_pipe # Close pipe fds, so that we don't leak fds continually
# # if some client stops and starts us continually.
THE { infd, outfd }
=> # NB: We avoid calling external fns while holding the mutex due to threat of threadswitch, re-entry and deadlock.
{
psx::close__without_syscall_redirection infd;
psx::close__without_syscall_redirection outfd;
};
NULL => ();
esac;
hth::broadcast_condvar condvar; # This will in particular wake up the loop in stop_server_hostthread_if_running().
r.reply ();
#
hth::hostthread_exit ();
};
#
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server hostthread.
=
r.reply r.what;
#
fun write_to_private_pipe () # We do this to wake the main server hostthread from its I/O wait sleep.
=
{
pipe = the *private_pipe;
#
{ bytes_written # Ignored.
=
psx::write_vector__without_syscall_redirection # Write one byte into pipe.
(
pipe.outfd,
one_byte_slice_of_one_byte_unts
);
}
except
x = {
printf "iow::write_to_private_pipe/XXX EXCEPTION THROWN BY WRITE_VECTOR!\n";
(exception_name::exception_message x) -> msg;
printf "iow::write_to_private_pipe/XXXb: %s\n" msg;
};
}
where
one_byte_slice_of_one_byte_unts # Just anything one byte long to write into our internal pipe.
=
vector_slice_of_one_byte_unts::make_full_slice # vector_slice_of_one_byte_unts is from
src/lib/std/src/vector-slice-of-one-byte-unts.pkg #
one_byte_vector_of_one_byte_unts
where
one_byte_vector_of_one_byte_unts
=
vu1::from_list
#
[ (one_byte_unt::from_int 0) ];
end;
end;
#
# fun self_test caller_id # Assumes private_pipe exists -- ie., that server_code() has been called.
# =
# {
# printf "iow::self_test/TOP ... (%s)\n" caller_id;
# printf "iow::self_test/AAA calling wait_for_io_opportunity... (%s)\n" caller_id;
# fds_ready_for_io
# =
# wio::wait_for_io_opportunity
# {
# wait_requests => *wait_requests,
# timeout => THE *timeout
# };
# printf "iow::self_test BBB: Back from waiting for io opportunity, %d fds_ready_for_io: (%s)\n" (list::length fds_ready_for_io) caller_id;
# print_wait_requests fds_ready_for_io;
#
# printf "iow::self_test CCC: Writing to pipe... (%s)\n" caller_id;
# write_to_private_pipe ();
# printf "iow::self_test DDD: Back from writing to pipe... (%s)\n" caller_id;
#
# printf "iow::self_test EEE: calling wait_for_io_opportunity... (%s)\n" caller_id;
# fds_ready_for_io
# =
# wio::wait_for_io_opportunity
# {
# wait_requests => *wait_requests,
# timeout => THE *timeout
# };
# printf "iow::self_test FFF: Back from waiting for io opportunity, %d fds_ready_for_io: (%s)\n" (list::length fds_ready_for_io) caller_id;
# print_wait_requests fds_ready_for_io;
#
# printf "iow::self_test GGG: Doing apply dummy_process_io_ready_fd fds_ready_for_io ... (%s)\n" caller_id;
# apply dummy_process_io_ready_fd fds_ready_for_io; # Handle any new I/O opportunities.
# printf "iow::self_test HHH: Done apply dummy_process_io_ready_fd fds_ready_for_io (%s)\n" caller_id;
# printf "iow::self_test ZZZ (%s)\n" caller_id;
# }
# where
# fun dummy_process_io_ready_fd { io_descriptor => iod, readable, writable, oobdable }
# =
# {
# printf "iow::dummy_process_io_ready_fd/TOP: Doing read_as_vector() (%s)\n" caller_id;
# bytevector
# =
# psx::read_as_vector__without_syscall_redirection # Read and discard the byte that was sent to us.
# {
# file_descriptor => iod,
# max_bytes_to_read => 1
# };
#
# printf "iow::dummy_process_io_ready_fd/AAA: Done read, (vu1::length bytevector) d=%d (%s)\n" (vu1::length bytevector) caller_id;
# if ((vu1::length bytevector) == 0) # We expect to see a 1-byte result.
# #
# printf "iow::dummy_process_io_ready_fd/BBB: ZERO LENGTH RESULT UNEXPECTED! (%s)\n" caller_id;
# wxp::sleep 0.1;
# printf "iow::dummy_process_io_ready_fd/CCC: slept 0.1. (%s)\n" caller_id;
# fi;
# printf "iow::dummy_process_io_ready_fd/ZZZ (%s)\n" caller_id;
# };
# end;
# #
# fun do_test (caller_id: String) # Internal fn -- will execute in context of server hostthread.
# =
# {
# printf "iow::do_test/AAA (%s)\n" caller_id;
# hth::acquire_mutex mutex;
# printf "iow::do_test/BBB (%s)\n" caller_id;
# self_test caller_id;
# printf "iow::do_test/CCC (%s)\n" caller_id;
# self_test_complete := TRUE;
# printf "iow::do_test/DDD (%s)\n" caller_id;
# hth::broadcast_condvar condvar; # This will in particular wake up the loop in stop_server_hostthread_if_running().
# printf "iow::do_test/EEE (%s)\n" caller_id;
# hth::release_mutex mutex;
# printf "iow::do_test/ZZZ (%s)\n" caller_id;
# };
stipulate
# A helper fn shared by
# drop_iod_reader and
# drop_iod_writer and
# drop_iod_oobder:
#
fun drop_fn (old_tree, index) # Drop fn with given index from our red-black tree.
=
im::drop (old_tree, index);
herein
# For the next six fns there is no obvious
# reason to do the work in the server hostthread,
# so we go ahead and do it in the context of
# the client hostthread:
#
fun note_iod_reader { io_descriptor, read_fn } # Start watching for opportunities to read given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), read_op );
#
hth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, read_fn );
#
wait_requests := note_read_request (*wait_requests, io_descriptor);
#
client_fd_count := *client_fd_count + 1;
#
hth::release_mutex mutex;
}
where
fun note_read_request ([], iod) # Drop 'write' request for given iod.
=>
[ { io_descriptor, readable => TRUE, writable => FALSE, oobdable => FALSE } ];
note_read_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable => TRUE, writable, oobdable } ! rest; # Previous op request(s) for that iod, so just 'write' to them.
else req ! note_read_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_reader (iod: wio::Iod) # Stop watching for opportunities to read given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), read_op );
#
hth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_read_request (*wait_requests, iod);
#
client_fd_count := *client_fd_count - 1;
#
hth::release_mutex mutex;
}
where
fun drop_read_request ([], iod) # Drop read-request for given iod from requests-list.
=>
[];
drop_read_request ( ( (req as { io_descriptor, readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest; # Only request for that iod is 'read', so drop it completely.
else req ! drop_read_request (rest, iod);
fi;
drop_read_request (((req as { io_descriptor, readable => TRUE, writable, oobdable }) ! rest), iod) # Multiple op requests fro that iod, so drop only the 'read' one.
=>
if (io_descriptor == iod) { io_descriptor, readable => FALSE, writable, oobdable } ! rest;
else req ! drop_read_request (rest, iod);
fi;
drop_read_request (req ! rest, iod)
=>
req ! drop_read_request (rest, iod);
end;
end;
#
fun note_iod_writer { io_descriptor, write_fn } # Start watching for opportunities to write given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), write_op );
#
hth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, write_fn );
#
wait_requests := note_write_request (*wait_requests, io_descriptor);
#
client_fd_count := *client_fd_count + 1;
#
hth::release_mutex mutex;
}
where
fun note_write_request ([], iod) # Add 'write' request for given iod.
=>
[ { io_descriptor, readable => FALSE, writable => TRUE, oobdable => FALSE } ];
note_write_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable => TRUE, oobdable } ! rest; # Previous op request(s) for that iod, so just add 'write' to them.
else req ! note_write_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_writer (iod: wio::Iod) # Stop watching for opportunities to write given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), write_op );
#
hth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_write_request (*wait_requests, iod);
#
client_fd_count := *client_fd_count - 1;
#
hth::release_mutex mutex;
}
where
fun drop_write_request ([], iod) # Drop 'write' request for given iod.
=>
[];
drop_write_request ( ( (req as { io_descriptor, readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest;
else req ! drop_write_request (rest, iod); # Only request for that iod is 'read', so drop it completely.
fi;
#
drop_write_request (((req as { io_descriptor, readable, writable => TRUE, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable => FALSE, oobdable } ! rest; # Multiple op requests for that iod, so drop only the 'write' one.
else req ! drop_write_request (rest, iod);
fi;
drop_write_request (req ! rest, iod)
=>
req ! drop_write_request (rest, iod);
end;
end;
#
fun note_iod_oobder { io_descriptor, oobd_fn } # Start watching for opportunities to read out-of-band data from given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), oobd_op );
#
hth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, oobd_fn );
#
wait_requests := note_oobd_request (*wait_requests, io_descriptor);
#
client_fd_count := *client_fd_count + 1;
#
hth::release_mutex mutex;
}
where
fun note_oobd_request ([], iod) # Add 'oobd' request for given iod.
=>
[ { io_descriptor, readable => FALSE, writable => TRUE, oobdable => FALSE } ];
note_oobd_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable, oobdable => TRUE } ! rest; # Previous op request(s) for that iod, so just add 'oobd' to them.
else req ! note_oobd_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_oobder (iod: wio::Iod) # Stop watching for opportunities to read out-of-band data from given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), oobd_op );
#
hth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_oobd_request (*wait_requests, iod);
#
client_fd_count := *client_fd_count - 1;
#
hth::release_mutex mutex;
}
where
fun drop_oobd_request ([], iod) # Drop 'oobd' request for given iod.
=>
[];
drop_oobd_request ( ( (req as { io_descriptor, readable => FALSE,
writable => FALSE,
oobdable => TRUE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest;
else req ! drop_oobd_request (rest, iod); # Only request for that iod is 'oobd', so drop it completely.
fi;
#
drop_oobd_request (((req as { io_descriptor, readable, writable, oobdable => TRUE }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable, oobdable => FALSE } ! rest; # Multiple op requests for that iod, so drop only the 'oobd' one.
else req ! drop_oobd_request (rest, iod);
fi;
drop_oobd_request (req ! rest, iod)
=>
req ! drop_oobd_request (rest, iod);
end;
end;
end;
#
fun stop_server_hostthread_if_running (request: Do_Stop) # Queue up a stop-server-hostthread request for later execution by do_stop.
= # External fn -- will execute in context of client hostthread.
{
if (*server_hostthread_is_running)
#
hth::acquire_mutex mutex;
#
request_queue := (DO_STOP request) ! *request_queue;
#
hth::release_mutex mutex;
#
write_to_private_pipe(); # Wake up main server hostthread to process request.
# DO NOT DO THIS WHILE HOLDING THE MUTEX!
hth::acquire_mutex mutex;
#
for (*server_hostthread_is_running) {
#
hth::wait_on_condvar (condvar, mutex); # This condvar will wake us each time running_servers_count changes.
};
#
hth::release_mutex mutex;
fi;
};
#
fun echo (request: Do_Echo) # Queue up an echo-back-toclient request for later execution by do_echo. (This is mostly for unit testing.)
= # External fn -- will execute in context of client hostthread.
{
hth::acquire_mutex mutex;
#
request_queue := (DO_ECHO request) ! *request_queue;
#
hth::release_mutex mutex;
#
write_to_private_pipe(); # Wake up main server hostthread to process request.
};
#
# fun test (caller_id: String) # WE ASSUME ONLY ONE CALLER AT A TIME, SO WE DON'T WORRY ABOUT MUTUAL EXCLUSION.
# = # External fn -- will execute in context of client hostthread.
# {
# printf "iow::test/AAA acquiring mutex... (%s)\n" caller_id;
# hth::acquire_mutex mutex;
# #
# printf "iow::test/BBB acquired mutex... (%s)\n" caller_id;
# self_test_complete := FALSE;
#
# printf "iow::test CCC (%s)\n" caller_id;
# request_queue := (DO_TEST caller_id) ! *request_queue;
# #
# hth::release_mutex mutex;
# printf "iow::test DDD (%s)\n" caller_id;
# write_to_private_pipe(); # Wake up main server hostthread to process request.
# # DO NOT DO THIS WHILE HOLDING THE MUTEX!
# hth::acquire_mutex mutex;
#
# printf "iow::test EEE (%s)\n" caller_id;
# for (not *self_test_complete) {
# #
# printf "iow::test FFF (%s)\n" caller_id;
# hth::wait_on_condvar (condvar, mutex); # This condvar will wake us each time running_servers_count changes.
# };
#
# #
# printf "iow::test/GGG releasing mutex... (%s)\n" caller_id;
# hth::release_mutex mutex;
# printf "iow::test/ZZZ released mutex. (%s)\n" caller_id;
# };
#
fun get_new_requests ()
=
{
hth::acquire_mutex mutex;
#
new_requests = *request_queue;
#
request_queue := [];
#
hth::release_mutex mutex;
#
reverse new_requests; # 'reverse' to restore original request ordering.
};
#
fun server_code () # Our server hostthread begins execution here.
=
{
hth::set_hostthread_name "io wait";
hth::acquire_mutex mutex;
#
server_hostthread_is_running := TRUE;
private_pipe := THE (psx::make_pipe__without_syscall_redirection ()); # We do not close any existing pipe fds here because they might be stale stuff from before a heap dump/load cycle, in which
# case closing them might close something we actually want this time around -- that would produce very mysterious bugs!
wait_requests := default_wait_request_list (the *private_pipe); # By default we listen only on our private pipe.
client_fd_count := 0; # As yet we are watching no fds for clients.
# timeout := tim::from_float_seconds 0.02; # Start up with timeout frequency set to 50Hz.
# THIS VALUE IS ONLY A TEMPORARY DEBUG HACK:
timeout := tim::from_float_seconds 0.5;
hth::broadcast_condvar condvar; # This will in particular wake up the loop in start_server_hostthread_if_not_running().
#
hth::release_mutex mutex;
#
server_loop ();
}
where
fun service_request (DO_STOP r) => do_stop r;
service_request (DO_ECHO r) => do_echo r;
# service_request (DO_TEST r) => do_test r;
end;
#
fun process_io_ready_fd { io_descriptor => iod, readable, writable, oobdable }
=
{
pipe = the *private_pipe;
#
if (iod != psx::fd_to_iod pipe.infd)
#
# Normal case: An I/O opportunity has appeared
# on a client-specified fd, so call the corresponding
# client-supplied handler fn:
#
if readable index = fdop_to_index ((psx::iod_to_fd iod), read_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
if writable index = fdop_to_index ((psx::iod_to_fd iod), write_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
if oobdable index = fdop_to_index ((psx::iod_to_fd iod), oobd_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
else
# Special case: A byte has been sent to us on our
# private pipe to wake us from our normal I/O wait.
# It means that request_queue holds client-hostthread
# request(s) for us to process:
#
bytevector
=
psx::read_as_vector__without_syscall_redirection # Read and discard the byte that was sent to us.
{
file_descriptor => pipe.infd,
max_bytes_to_read => 1
};
# Sanity check:
#
if ((vu1::length bytevector) == 0) # We expect to see a 1-byte result.
# # (We might possibly see more than one byte if multiple requests have been submitted since we last woke up.)
#
printf "iow::process_io_ready_fd/OOO: error: Exiting due unexpected EOF on private pipe. -- src/lib/std/src/hostthread/io-wait-hostthread.pkg\n";
wxp::exit_uncleanly 1;
fi;
();
fi;
};
#
fun server_loop () # This is the outer loop for the io-wait server hostthread.
=
{
{
fds_ready_for_io
=
wio::wait_for_io_opportunity__without_syscall_redirection # We *are* a secondary hostthread, so it makes no sense to redirect this call via a(nother) secondary hostthread.
{ # Also, it wouldn't work because we don't have the infrastructure to harvest the responses to redirected calls.
wait_requests => *wait_requests,
timeout => THE *timeout
};
apply process_io_ready_fd fds_ready_for_io; # Handle any new I/O opportunities.
apply service_request (get_new_requests()); # Handle any new requests from client hostthreads.
apply (\\ f = *f()) *per_loop_fns; # Give the threadkit scheduler an irregular "clock" pulse to drive preemptive timeslicing.
#
} except x = { # NB: Placing this 'except' clause at Position P would result in a memory leak, as of 2013-04-06 at least.
printf "error: iow::server_loop: Exception!\n";
printf "error: iow::server_loop/exception name s='%s'\n" (exceptions::exception_name x);
printf "error: iow::server_loop/exception msg s='%s'\n" (exceptions::exception_message x);
raise exception x; # Should probably shut down hard and sudden here. XXX SUCKO FIXME.
};
server_loop ();
}; # Position P.
end;
#
fun start_server_hostthread_if_not_running per_who # 'per_who' is a string identifying the client requesting the startup, for logging purposes.
=
{
actual_pid = wxp::get_process_id ();
hth::acquire_mutex mutex;
#
if(*pid != 0 and *pid != actual_pid)
pid := actual_pid;
server_hostthread_is_running := FALSE; # Ensure that if the heap gets dumped to disk and then and reloaded, *server_hostthread_is_running will still be correct.
fi;
#
hth::release_mutex mutex;
#
if (not *server_hostthread_is_running)
#
hth::acquire_mutex mutex;
#
hth::spawn_hostthread server_code;
#
for (not *server_hostthread_is_running) {
#
hth::wait_on_condvar (condvar, mutex); # This condvar will wake us each time running_servers_count changes.
};
hth::release_mutex mutex;
fi;
};
fun is_doing_useful_work ()
#
# This is support for
#
# no_runnable_threads_left__fate
# from
#
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg #
# which is tasked with exit()ing if the system is
# deadlocked -- which is to say, no thread ready
# to run and provably no prospect of ever having
# a thread ready to run.
#
# If we are listening on any fd, then we can eventually
# wake up some thread when input arrives on that fd, so
# the system is not truly deadlocked and no_runnable_threads_left__fate()
# should not exit:
=
if (not (is_running ())) FALSE; # If we're not running, we're not doing useful work. :-)
else
hth::acquire_mutex mutex;
#
got_client_fds = *client_fd_count > 0; # If we are running, we're doing useful work if-and-only-if we are watching at least one fd for clients.
got_requests = case *request_queue [] => FALSE;
_ => TRUE;
esac;
#
hth::release_mutex mutex;
got_client_fds or got_requests;
fi;
};
end;
#######################################################################
# Note[1]
# Most hostthread servers block on their request_queue when not busy,
# and thus can be woken by a simple hth::broadcast_condvar, but
# here we will be spending almost all our time blocked in a select(),
# so that will not work.
#
# Consequently we use a dedicated pipe to wake our server hostthread.
# By always including a read of this pipe in our wio::wait_for_io_opportunity
# call (== C select()/poll()) we ensure that the server hostthread can
# always we woken just by writing one byte to the pipe.
#
# NB: Modern Linux/Posix provide pselect()/ppoll() calls which can wait
# on signals as well as fds, so an alternative would be to use them
# and then wake-via-signal. But signals are public and finite in number
# whereas pipes are private and unbounded in number, so the pipe
# solution still seems cleaner, and efficiency is a non-issue in
# this application (we do not expect to use this mechanism heavily).