


# io-wait-pthread.pkg
#
# For general background, see "Overview" comments in src/lib/std/src/pthread/io-wait-pthread.api#
# Our basic task here is to atch a set of file descriptors
# (typically pipes or sockets) and when an I/O opportunity
# arrives (almost always data now available to read) call
# a corresponding client-provided closure to take advantage
# of that opportunity.
#
# An I/O opportunity can be one of three things:
#
# o Input file descriptor now has data available to be read. (The typical case of interest.)
# o Output file descriptor can now accept a data write. (Much rarer -- usually we just to a blocking write.)
# o Out-of-band-data is available to be read. (Vanishingly rare in practice.)
#
# So our main datastructure requirement it to be able to map
# (filedescriptor, action)
# pairs to corresponding client-provided closures ("functions").
#
# Since fds are short integers and actions can be represented
# in two bits, we fold fd+action into a single int which can
# then be used to index a stock red-black tree.
#
#
# See also:
#
# src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg# src/lib/std/src/pthread/io-bound-task-pthreads.pkg# Compiled by:
# src/lib/std/standard.libstipulate
package fil = file; # file is from src/lib/std/src/posix/file.pkg package im = int_red_black_map; # int_red_black_map is from src/lib/src/int-red-black-map.pkg package psx = posix_1003_1b; # posix_1003_1b is from src/lib/std/src/posix-1003.1b/posix-1003-1b.pkg package pth = pthread; # pthread is from src/lib/std/src/pthread.pkg package tim = time; # time is from src/lib/std/time.pkg package wio = winix::io; # winix::io is from src/lib/std/src/posix/winix-io.pkg package wxp = winix::process; # winix::process is from src/lib/std/src/posix/winix-process.pkgherein
package io_wait_pthread
: (weak) Io_Wait_Pthread # Io_Wait_Pthread is from src/lib/std/src/pthread/io-wait-pthread.api {
#################################################################
# BEGIN type definitions section
Pipe = { infd: psx::File_Descriptor,
outfd: psx::File_Descriptor
};
# One record type for each request
# supported by the server:
#
Do_Stop = { who: String, reply: Void -> Void }; # Edit to suit. The 'reply' thunks will typically send a do_something() request back to the originating pthread.
Do_Echo = { what: String, reply: String -> Void };
Do_Note_Iod_Reader
=
{ io_descriptor: wio::Io_Descriptor,
read_fn: wio::Io_Descriptor -> Void # Call this fn (closure) on iod whenever input is available on fd.
};
Do_Note_Iod_Writer
=
{ io_descriptor: wio::Io_Descriptor,
write_fn: wio::Io_Descriptor -> Void # Call this fn (closure) on iod whenever output is possible on fd.
};
Do_Note_Iod_Oobder
=
{ io_descriptor: wio::Io_Descriptor,
oobd_fn: wio::Io_Descriptor -> Void # Call this fn (closure) on iod whenever out-of-band data ("oobd") is available on this fd.
};
Request = DO_STOP Do_Stop # Union of above record types, so that we can keep them all in one queue.
| DO_ECHO Do_Echo
;
# END of type definitions section
#################################################################
#################################################################
# BEGIN MUTABLE STATE SECTION
#
pid = REF 0; # pid of current process while server is running, otherwise zero.
client_fns
=
REF (im::empty: im::Map( wio::Io_Descriptor -> Void )); # When an fd is ready to read, we'll look it up in this and call resulting read_fn on it. (Or similar for writing or out-of-band data.)
private_pipe = REF (NULL: Null_Or(Pipe)); # This is a private pipe we use to wake our server pthread by sending it a byte.
# Since it spends all of its time sleeping on a wio::wait_for_io_opportunity (C select()/poll())
# call, sending it a byte is the simplest and best way to wake it. (See Note[1].)
wait_requests = REF ([]: List( wio::Wait_Request )); # This is the set of file descriptors to watch for I/O opportunities via wio::wait_for_io_opportunity (C select()/poll()).
# INVARIANT: No two entries on list have same Iod_Descriptor value.
timeout = REF (tim::from_float_seconds 0.02); # Set up to timeout at 50Hz.
# thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg per_loop_fns = REF ([]: List( Ref(Void -> Void)) ); # Functions to call once each time around the main outer loop.
# See note_per_loop_fn/drop_per_loop_fn comments in src/lib/std/src/pthread/io-wait-pthread.api request_queue = REF ([]: List(Request)); # Queue of pending requests from client pthreads.
mutex = pth::make_mutex (); # These will actually survive a heap save/load cycle, pretty much.
condvar = pth::make_condvar ();
# END OF MUTABLE STATE SECTION
#################################################################
fun print_wait_requests ({ io_descriptor, readable, writable, oobdable } ! rest) # A little debug-support function, not used in in production code.
=>
{ fd = psx::iod_to_fd (psx::fd_to_int io_descriptor);
#
printf "fd %d readable %b writable %b oobdable %b\n"
fd
readable
writable
oobdable
;
};
print_wait_requests [] => ();
end;
#################################################################
# This section implements our tree mapping (iod+op) -> client_fn:
# Values for our two-bit read/write/oobd field: # "oobd" == "out-of-band data".
#
read_op = 1;
write_op = 2;
oobd_op = 3;
#
fun fdop_to_index (fd: psx::File_Descriptor, op: Int)
=
{ fd' = psx::fd_to_int fd;
#
index = (fd' << 2) | op;
#
index: Int;
};
#
fun index_to_fdop (index: Int)
=
{ op = (index & 3);
fd = (index >> 2);
#
(fd, op);
};
#
fun is_running ()
=
(*pid != 0 and *pid == wxp::get_process_id ()); # This way, if the heap gets dumped to disk and then and reloaded, is_running() will
# (correctly) return FALSE, even though pid may not have gotten zeroed.
# There is of course a small chance that by accident we still have the same pid after
# a save/reload, in which case we lose. XXX BUGGO FIXME.
# A fix might be to have a generation number associated with each heap image which gets
# incremented on every save/load cycle.
#
fun request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *request_queue [] => TRUE; # and Mythryl does not support comparison of thunks for equality.)
_ => FALSE;
esac;
#
fun default_wait_request_list (pipe: Pipe)
=
{ # Our minimal request list is to read
# the pipe that clients use to wake us:
#
io_descriptor = psx::fd_to_iod pipe.infd;
#
[ { io_descriptor,
#
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
];
};
#
fun get_timeout_interval () = *timeout; # See comments in src/lib/std/src/pthread/io-wait-pthread.api #
fun set_timeout_interval time # Set timeout to use for our wio::wait_for_io_opportunity calls (C select()/poll().)
=
{
pth::acquire_mutex mutex; # Using the mutex here does have a point -- it
# # ensures that no mutex-locked critical section will
timeout := time; # see the timeout changing unexpected out from under it.
#
pth::release_mutex mutex;
};
# For background on these two see comments in
#
# src/lib/std/src/pthread/io-wait-pthread.api #
fun note_per_loop_fn loopfn # Set fn to be called once each time around our outer loop.
= # This fn is used to drive pre-emptive multi-threading in the
{ # threadkit scheduler: src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg pth::acquire_mutex mutex;
#
if (not (list::in (loopfn, *per_loop_fns))) # Don't add fn to list if it is already in the list.
#
per_loop_fns := loopfn ! *per_loop_fns;
fi;
#
pth::release_mutex mutex;
};
fun drop_per_loop_fn loopfn
=
{
pth::acquire_mutex mutex;
#
per_loop_fns := list::drop (loopfn, *per_loop_fns);
#
pth::release_mutex mutex;
};
#
fun do_stop (r: Do_Stop) # Shut down io-wait-pthread server.
= # Internal fn -- will execute in context of server pthread.
{
case *private_pipe # Close pipe fds, so that we don't leak fds continually
# # if some client stops and starts us continually.
THE { infd, outfd }
=>
{ psx::close infd;
psx::close outfd;
};
NULL => ();
esac;
#
fil::log .{ "src/lib/std/src/pthread/io-wait-pthread.pkg: Shutting down per request from '" + r.who + "'."; };
#
pid := 0;
#
r.reply ();
#
pth::pthread_exit ();
};
#
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server pthread.
=
r.reply r.what;
stipulate
# A helper fn shared by
# drop_iod_reader and
# drop_iod_writer and
# drop_iod_oobder:
#
fun drop_fn (old_tree, index) # Drop fn with given index from our red-black tree.
=
{ (im::drop (old_tree, index))
->
(new_tree, dropped_value);
new_tree;
};
herein
# For the next six fns there is no obvious
# reason to do the work in the server pthread,
# so we go ahead and do it in the context of
# the client pthread:
#
fun note_iod_reader { io_descriptor, read_fn } # Start watching for opportunities to read given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), read_op );
#
pth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, read_fn );
#
wait_requests := note_read_request (*wait_requests, io_descriptor);
#
pth::release_mutex mutex;
}
where
fun note_read_request ([], iod) # Drop 'write' request for given iod.
=>
[ { io_descriptor, readable => TRUE, writable => FALSE, oobdable => FALSE } ];
note_read_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable => TRUE, writable, oobdable } ! rest; # Previous op request(s) for that iod, so just 'write' to them.
else req ! note_read_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_reader (iod: wio::Io_Descriptor) # Stop watching for opportunities to read given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), read_op );
#
pth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_read_request (*wait_requests, iod);
#
pth::release_mutex mutex;
}
where
fun drop_read_request ([], iod) # Drop read-request for given iod from requests-list.
=>
[];
drop_read_request ( ( (req as { io_descriptor, readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest; # Only request for that iod is 'read', so drop it completely.
else req ! drop_read_request (rest, iod);
fi;
drop_read_request (((req as { io_descriptor, readable => TRUE, writable, oobdable }) ! rest), iod) # Multiple op requests fro that iod, so drop only the 'read' one.
=>
if (io_descriptor == iod) { io_descriptor, readable => FALSE, writable, oobdable } ! rest;
else req ! drop_read_request (rest, iod);
fi;
drop_read_request (req ! rest, iod)
=>
req ! drop_read_request (rest, iod);
end;
end;
#
fun note_iod_writer { io_descriptor, write_fn } # Start watching for opportunities to write given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), write_op );
#
pth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, write_fn );
#
wait_requests := note_write_request (*wait_requests, io_descriptor);
#
pth::release_mutex mutex;
}
where
fun note_write_request ([], iod) # Add 'write' request for given iod.
=>
[ { io_descriptor, readable => FALSE, writable => TRUE, oobdable => FALSE } ];
note_write_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable => TRUE, oobdable } ! rest; # Previous op request(s) for that iod, so just add 'write' to them.
else req ! note_write_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_writer (iod: wio::Io_Descriptor) # Stop watching for opportunities to write given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), write_op );
#
pth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_write_request (*wait_requests, iod);
#
pth::release_mutex mutex;
}
where
fun drop_write_request ([], iod) # Drop 'write' request for given iod.
=>
[];
drop_write_request ( ( (req as { io_descriptor, readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest;
else req ! drop_write_request (rest, iod); # Only request for that iod is 'read', so drop it completely.
fi;
#
drop_write_request (((req as { io_descriptor, readable, writable => TRUE, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable => FALSE, oobdable } ! rest; # Multiple op requests for that iod, so drop only the 'write' one.
else req ! drop_write_request (rest, iod);
fi;
drop_write_request (req ! rest, iod)
=>
req ! drop_write_request (rest, iod);
end;
end;
#
fun note_iod_oobder { io_descriptor, oobd_fn } # Start watching for opportunities to read out-of-band data from given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd io_descriptor), oobd_op );
#
pth::acquire_mutex mutex;
#
client_fns := im::set( *client_fns, index, oobd_fn );
#
wait_requests := note_oobd_request (*wait_requests, io_descriptor);
#
pth::release_mutex mutex;
}
where
fun note_oobd_request ([], iod) # Add 'oobd' request for given iod.
=>
[ { io_descriptor, readable => FALSE, writable => TRUE, oobdable => FALSE } ];
note_oobd_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable, oobdable => TRUE } ! rest; # Previous op request(s) for that iod, so just add 'oobd' to them.
else req ! note_oobd_request (rest, iod);
fi;
end;
end;
#
fun drop_iod_oobder (iod: wio::Io_Descriptor) # Stop watching for opportunities to read out-of-band data from given iod.
=
{ index = fdop_to_index( (psx::iod_to_fd iod), oobd_op );
#
pth::acquire_mutex mutex;
#
client_fns := drop_fn (*client_fns, index);
#
wait_requests := drop_oobd_request (*wait_requests, iod);
#
pth::release_mutex mutex;
}
where
fun drop_oobd_request ([], iod) # Drop 'oobd' request for given iod.
=>
[];
drop_oobd_request ( ( (req as { io_descriptor, readable => FALSE,
writable => FALSE,
oobdable => TRUE
}
)
!
rest
),
iod
)
=>
if (io_descriptor == iod) rest;
else req ! drop_oobd_request (rest, iod); # Only request for that iod is 'oobd', so drop it completely.
fi;
#
drop_oobd_request (((req as { io_descriptor, readable, writable, oobdable => TRUE }) ! rest), iod)
=>
if (io_descriptor == iod) { io_descriptor, readable, writable, oobdable => FALSE } ! rest; # Multiple op requests for that iod, so drop only the 'oobd' one.
else req ! drop_oobd_request (rest, iod);
fi;
drop_oobd_request (req ! rest, iod)
=>
req ! drop_oobd_request (rest, iod);
end;
end;
end;
#
fun write_to_private_pipe () # We do this to wake the main server pthread from its I/O wait sleep.
=
{ pipe = the *private_pipe;
#
bytes_written # Ignored.
=
psx::write_vector # Write one byte into pipe.
(
pipe.outfd,
one_byte_slice_of_one_byte_unts
);
}
where
one_byte_slice_of_one_byte_unts # Just anything one byte long to write into our internal pipe.
=
vector_slice_of_one_byte_unts::make_full_slice # vector_slice_of_one_byte_unts is from src/lib/std/src/vector-slice-of-one-byte-unts.pkg #
one_byte_vector_of_one_byte_unts
where
one_byte_vector_of_one_byte_unts
=
vector_of_one_byte_unts::from_list # vector_of_one_byte_unts is from src/lib/std/src/vector-of-one-byte-unts.pkg #
[ (one_byte_unt::from_int 0) ];
end;
end;
#
fun stop (request: Do_Stop) # Queue up a stop-server-pthread request for later execution by do_stop.
= # External fn -- will execute in context of client pthread.
{
pth::acquire_mutex mutex;
#
request_queue := (DO_STOP request) ! *request_queue;
#
write_to_private_pipe(); # Wake up main server pthread to process request.
#
pth::release_mutex mutex;
};
#
fun echo (request: Do_Echo) # Queue up an echo-back-toclient request for later execution by do_echo. (This is mostly for unit testing.)
= # External fn -- will execute in context of client pthread.
{
pth::acquire_mutex mutex;
#
request_queue := (DO_ECHO request) ! *request_queue;
#
write_to_private_pipe(); # Wake up main server pthread to process request.
#
pth::release_mutex mutex;
};
#
fun get_new_requests ()
=
{
pth::acquire_mutex mutex;
#
new_requests = reverse *request_queue; # 'reverse' to restore original request ordering.
#
request_queue := [];
#
pth::release_mutex mutex;
#
new_requests;
};
#
fun server_loop () # This is the outer loop for the io-wait server pthread.
=
{ fds_ready_for_io
=
wio::wait_for_io_opportunity
{
wait_requests => *wait_requests,
timeout => THE *timeout
};
apply process_io_ready_fd fds_ready_for_io; # Handle any new I/O opportunities.
apply service_request (get_new_requests()); # Handle any new requests from client pthreads.
apply (fn f = *f()) *per_loop_fns; # Give the threadkit scheduler an irregular "clock" pulse to drive preemptive timeslicing.
#
server_loop ();
}
where
fun service_request (DO_STOP r) => do_stop r;
service_request (DO_ECHO r) => do_echo r;
end;
fun process_io_ready_fd { io_descriptor => iod, readable, writable, oobdable }
=
{ pipe = the *private_pipe;
#
if (iod != psx::fd_to_iod pipe.infd)
#
# Normal case: An I/O opportunity has appeared
# on a client-specified fd, so call the corresponding
# client-supplied handler fn:
#
if readable index = fdop_to_index ((psx::iod_to_fd iod), read_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
if writable index = fdop_to_index ((psx::iod_to_fd iod), write_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
if oobdable index = fdop_to_index ((psx::iod_to_fd iod), oobd_op); client_fn = the (im::get( *client_fns, index )); client_fn iod; fi;
else
# Special case: A byte has been sent to us on our
# private pipe to wake us from our normal I/O wait.
# It means that request_queue holds client-pthread
# request(s) for us to process:
#
psx::read_as_vector # Read and discard the byte that was sent to us.
{
file_descriptor => pipe.infd,
max_bytes_to_read => 1
};
();
fi;
};
end;
#
fun start who # 'who' is a string identifying the client requesting the startup, for logging purposes.
=
{ pth::acquire_mutex mutex;
#
if (not (is_running ()))
#
fil::log .{ "io_wait_pthread: start: Called by per '" + who + "'."; };
pid := wxp::get_process_id ();
pth::release_mutex mutex;
private_pipe := THE (psx::make_pipe ()); # We do not close any existing pipe fds here because they might be stale stuff from before a heap dump/load cycle,
# in which case closing them might close something we actually want this time around.
wait_requests := default_wait_request_list (the *private_pipe); # By default we listen only on our private pipe.
timeout := tim::from_float_seconds 0.02; # Start up with timeout frequency set to 50Hz.
pth::spawn_pthread server_loop;
TRUE;
else
# We re-initialize in this case also, so that
# clients will see simple predictable semantics:
# NB: Since we're running, we have a valid pipe;
# there is no reason to create a new one here.
wait_requests := default_wait_request_list (the *private_pipe);
timeout := tim::from_float_seconds 0.02;
pth::release_mutex mutex;
FALSE;
fi;
};
};
end;
#######################################################################
# Note[1]
# Most pthread servers block on their request_queue when not busy,
# and thus can be woken by a simple pth::broadcast_condvar, but
# here we will be spending almost all our time blocked in a select(),
# so that will not work.
#
# Consequently we use a dedicated pipe to wake our server pthread.
# By always including a read of this pipe in our wio::wait_for_io_opportunity
# call (C select()/poll()) we ensure that the server pthread can
# always we woken just by writing one byte to the pipe.
#
# NB: Modern Linux/Posix provide pselect()/ppoll() calls which can wait
# on signals as well as fds, so an alternative would be to use them
# and then wake-via-signal. But signals are public and finite in number
# whereas pipes are private and unbounded in number, so the pipe
# solution still seems cleaner, and efficiency is a non-issue in
# this application (we do not expect to use this mechanism heavily).


