## cpu-bound-task-hostthreads.pkg
#
# Server hostthreads to offload cpu-intensive computations
# from the main threadkit hostthread.
#
# See also:
#
#
src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg#
src/lib/std/src/hostthread/io-wait-hostthread.pkg# Compiled by:
#
src/lib/std/standard.libstipulate
package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package hth = hostthread; # hostthread is from
src/lib/std/src/hostthread.pkg package wxp = winix__premicrothread::process; # winix__premicrothread::process is from
src/lib/std/src/posix/winix-process--premicrothread.pkgherein
package cpu_bound_task_hostthreads
: Cpu_Bound_Task_Hostthreads # Cpu_Bound_Task_Hostthreads is from
src/lib/std/src/hostthread/cpu-bound-task-hostthreads.api {
# One record for each request
# supported by the server:
#
Do_Stop = { per_who: String, reply: Void -> Void };
Do_Echo = { what: String, reply: String -> Void };
Request = DO_STOP Do_Stop # Union of above record types, so that we can keep them all in one queue.
| DO_ECHO Do_Echo
| DO_TASK (Void -> Void)
;
mutex = hth::make_mutex ();
condvar = hth::make_condvar ();
pid = REF 0;
running_servers_count = REF 0; # Count of servers running. Typically as many as cores on the machine, or maybe one less.
running_thunks_count = REF 0; # Count of servers actually processing a request, as opposed to just being blocked waiting for something to do.
external_request_queue = REF ([]: List(Request));
internal_request_queue = REF ([]: List(Request));
#
# We need two queues because clients will prepend
# requests to the external queue, leaving it in
# reverse order, but we want to run tasks in
# submission order. So periodically when the
# internal queue is empty we set it to the
# reversed contents of the external queue.
fun get_count_of_live_hostthreads ()
=
{
actual_pid = wxp::get_process_id (); # If the heap gets dumped to disk and then and reloaded, running_servers_count will be bogus.
# # We detect this by checking if the pid has changed. There is of course a small chance
if(*pid != actual_pid) # that by accident we still have the same pid after a save/reload, in which case we lose. XXX BUGGO FIXME.
pid := actual_pid; # A fix might be to have a generation number associated with each heap image which gets
# # incremented on every save/load cycle.
running_servers_count := 0;
fi;
*running_servers_count;
};
fun external_request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *external_request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
fun internal_request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *internal_request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
fun do_stop (r: Do_Stop) # Internal fn -- will execute in context of server hostthread.
=
{
r.reply ();
hth::acquire_mutex mutex;
#
running_servers_count := *running_servers_count - 1;
running_thunks_count := *running_thunks_count - 1;
hth::broadcast_condvar condvar; # This will in particular wake up the loop in change_number_of_server_hostthreads_to().
#
hth::release_mutex mutex;
#
hostthread::hostthread_exit ();
};
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server hostthread.
=
r.reply r.what;
fun do_task (task: Void -> Void) # Internal fn -- will execute in context of server hostthread.
=
task ()
except _ = (); # Client thunk should never do this to us. We should log something if it does. XXX SUCKO FIXME.
###############################################
# The rest of the file is mostly boilerplate:
###############################################
fun stop_one_server_hostthread (request: Do_Stop) # External fn -- will execute in context of client hostthread.
=
{
hth::acquire_mutex mutex;
#
external_request_queue := (DO_STOP request) ! *external_request_queue;
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
};
fun echo (request: Do_Echo) # External fn -- will execute in context of client hostthread.
=
{
hth::acquire_mutex mutex;
#
external_request_queue := (DO_ECHO request) ! *external_request_queue;
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
};
fun do (task: Void -> Void) # External fn -- will execute in context of client hostthread.
=
{
hth::acquire_mutex mutex;
#
external_request_queue := (DO_TASK task) ! *external_request_queue;
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
};
fun get_next_request ()
=
{
hth::acquire_mutex mutex;
#
running_thunks_count := *running_thunks_count - 1;
for (external_request_queue_is_empty ()
and internal_request_queue_is_empty ()
){
hth::wait_on_condvar (condvar, mutex);
};
running_thunks_count := *running_thunks_count + 1;
case *internal_request_queue
#
(task ! rest)
=>
{ internal_request_queue := rest;
#
hth::release_mutex mutex;
#
task;
};
[] =>
case (reverse *external_request_queue)
#
[] => raise exception DIE "impossible"; # The above 'for' loop condition guarantees one of the two queues is nonempty.
#
[ task ]
=>
{ external_request_queue := [];
hth::release_mutex mutex;
task;
};
(task ! rest)
=>
{ internal_request_queue := rest; # Refill internal queue from external one, reversing to restore original request ordering.
external_request_queue := [];
#
hth::release_mutex mutex;
#
task;
};
esac;
esac;
};
fun server_code () # This is the outer loop for each cycleserver hostthread.
=
{
hth::set_hostthread_name "cpu";
hth::acquire_mutex mutex;
#
running_servers_count := *running_servers_count + 1;
running_thunks_count := *running_thunks_count + 1; # This will be decremented at the top of get_next_request().
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar; # This will in particular wake up the loop in change_number_of_server_hostthreads_to().
server_loop (); # Never returns.
}
where
fun service_request (DO_STOP r) => do_stop r;
service_request (DO_ECHO r) => do_echo r;
service_request (DO_TASK r) => do_task r;
end;
fun server_loop ()
=
{
service_request (get_next_request())
except x = { # NB: Moving this 'except' clause to position P below results in a bad memory leak.
printf "error: cpu::server_loop: Exception!\n";
printf "error: cpu::server_loop/exception name s='%s'\n" (exceptions::exception_name x);
printf "error: cpu::server_loop/exception msg s='%s'\n" (exceptions::exception_message x);
raise exception x; # Should probably shut down hard and sudden here. XXX SUCKO FIXME.
};
#
server_loop ();
}; # Position P.
end;
fun start_one_server_hostthread per_who
=
{
hth::spawn_hostthread server_code;
};
stipulate
startstop_mutex = hth::make_mutex (); # This mutex allows only one caller at a time into change_number_of_server_hostthreads_to().
herein
fun change_number_of_server_hostthreads_to per_who desired_hostthreads # Used both to run server hostthreads at system startup and also to stop them at system shutdown.
=
#
# Our job here is to start (or stop) enough hostthreads
# to make running_servers_count equal to desired_hostthreads:
#
hth::with_mutex_do startstop_mutex {. # Unlikely we'll ever have simultaneous calls, but lets be totally safe about it.
#
pid := wxp::get_process_id ();
current_hostthreads = get_count_of_live_hostthreads ();
if (current_hostthreads != desired_hostthreads)
#
# Start by ordering up the right number
# of hostthread births or deaths:
if (current_hostthreads < desired_hostthreads)
#
for (i = current_hostthreads;
i < desired_hostthreads;
++i
){
start_one_server_hostthread per_who; # 'per_who' just logs party responsible for starting up the hostthread.
};
else # current_hostthreads > desired_hostthreads
for (i = desired_hostthreads;
i < current_hostthreads;
++i
){
stop_one_server_hostthread { per_who, reply => (\\ _ = ()) };
};
fi;
# Finish up by waiting until actual number of
# hostthreads matches request.
#
# It would be nice to have a timeout here which
# logged an abort message and crashed out if things
# took too long, but that seems nontrivial with the
# current hostthread api, so we'll be less ambitious:
hostthread::acquire_mutex mutex; # This mutex serializes access to running_servers_count.
#
for (*running_servers_count != desired_hostthreads) {
#
hostthread::wait_on_condvar (condvar, mutex); # This condvar will wake us each time running_servers_count changes.
};
#
hostthread::release_mutex mutex;
fi;
};
end;
fun is_doing_useful_work ()
#
# This is support for
#
# no_runnable_threads_left__fate
# from
#
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg #
# which is tasked with exit()ing if the system is
# deadlocked -- which is to say, no thread ready
# to run and provably no prospect of ever having
# a thread ready to run.
#
# If we have any hostthread currently processing a request
# then it may in due course generate a reply waking up
# a thread, so the system is not provably deadlocked and
# no_runnable_threads_left__fate() should not exit:
=
{
hostthread::acquire_mutex mutex;
#
got_something_running = *running_thunks_count > 0;
external_queue_is_nonempty = case *external_request_queue [] => FALSE;
_ => TRUE;
esac;
internal_queue_is_nonempty = case *internal_request_queue [] => FALSE;
_ => TRUE;
esac;
#
hostthread::release_mutex mutex;
doing_something = got_something_running
or external_queue_is_nonempty
or internal_queue_is_nonempty;
doing_something;
};
};
end;
## Code by Jeff Prothero: Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.