


## cpu-bound-task-pthreads.pkg
#
# Server pthreads to offload cpu-intensive computations
# from the main threadkit pthread.
#
# See also:
#
# src/lib/std/src/pthread/io-bound-task-pthreads.pkg# src/lib/std/src/pthread/io-wait-pthread.pkg# Compiled by:
# src/lib/std/standard.libstipulate
package fil = file; # file is from src/lib/std/src/posix/file.pkg package pth = pthread; # pthread is from src/lib/std/src/pthread.pkg package wxp = winix::process; # winix::process is from src/lib/std/src/posix/winix-process.pkgherein
package cpu_bound_task_pthreads
: Cpu_Bound_Task_Pthreads # Cpu_Bound_Task_Pthreads is from src/lib/std/src/pthread/cpu-bound-task-pthreads.api {
# One record for each request
# supported by the server:
#
Do_Stop = { who: String, reply: Void -> Void };
Do_Echo = { what: String, reply: String -> Void };
Request = DO_STOP Do_Stop # Union of above record types, so that we can keep them all in one queue.
| DO_ECHO Do_Echo
| DO_TASK (Void -> Void)
;
mutex = pth::make_mutex ();
condvar = pth::make_condvar ();
pid = REF 0;
running_servers_count = REF 0; # Count of servers running. Typically as many as cores on the machine, or maybe one less.
external_request_queue = REF ([]: List(Request));
internal_request_queue = REF ([]: List(Request));
#
# We need two queues because clients will prepend
# requests to the external queue, leaving it in
# reverse order, but we want to run tasks in
# submission order. So periodically when the
# internal queue is empty we set it to the
# reversed contents of the external queue.
fun get_count_of_running_pthreads ()
=
{
#
actual_pid = wxp::get_process_id (); # If the heap gets dumped to disk and then and reloaded, running_servers_count will be bogus.
# # We detect this by checking if the pid has changed. There is of course a small chance
if(*pid != actual_pid) # that by accident we still have the same pid after a save/reload, in which case we lose. XXX BUGGO FIXME.
pid := actual_pid; # A fix might be to have a generation number associated with each heap image which gets
# # incremented on every save/load cycle.
running_servers_count := 0;
fi;
*running_servers_count;
};
fun external_request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *external_request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
fun internal_request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *internal_request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
fun do_stop (r: Do_Stop) # Internal fn -- will execute in context of server pthread.
=
{ r.reply ();
#
fil::log .{ "src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg: Shutting down per request from '" + r.who + "'."; };
#
running_servers_count
:=
get_count_of_running_pthreads () - 1;
#
pthread::pthread_exit ();
};
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server pthread.
=
r.reply r.what;
fun do_task (task: Void -> Void) # Internal fn -- will execute in context of server pthread.
=
task ();
###############################################
# The rest of the file is mostly boilerplate:
###############################################
fun stop (request: Do_Stop) # External fn -- will execute in context of client pthread.
=
{
pth::acquire_mutex mutex;
#
external_request_queue := (DO_STOP request) ! *external_request_queue;
#
pth::broadcast_condvar condvar;
#
pth::release_mutex mutex;
};
fun echo (request: Do_Echo) # External fn -- will execute in context of client pthread.
=
{
pth::acquire_mutex mutex;
#
external_request_queue := (DO_ECHO request) ! *external_request_queue;
#
pth::broadcast_condvar condvar;
#
pth::release_mutex mutex;
};
fun do (task: Void -> Void) # External fn -- will execute in context of client pthread.
=
{
pth::acquire_mutex mutex;
#
external_request_queue := (DO_TASK task) ! *external_request_queue;
#
pth::broadcast_condvar condvar;
#
pth::release_mutex mutex;
};
fun get_next_request ()
=
{
pth::acquire_mutex mutex;
#
for (external_request_queue_is_empty ()
and internal_request_queue_is_empty ()
){
#
pth::wait_on_condvar (condvar, mutex);
};
case *internal_request_queue
#
(task ! rest)
=>
{ internal_request_queue := rest;
#
pth::release_mutex mutex;
#
task;
};
[] =>
case *external_request_queue
#
(task ! rest)
=>
{ internal_request_queue := reverse rest; # Refill internal queue from external one, reversing to restore original request ordering.
external_request_queue := [];
#
pth::release_mutex mutex;
#
task;
};
[] => raise exception FAIL "impossible"; # The above 'for' loop condition guarantees one of the two queues is nonempty.
esac;
esac;
};
fun server_loop () # This is the outer loop for each cycleserver pthread.
=
{ service_request (get_next_request());
#
server_loop ();
}
where
fun service_request (DO_STOP r) => do_stop r;
service_request (DO_ECHO r) => do_echo r;
service_request (DO_TASK r) => do_task r;
end;
end;
fun start who
=
{ pth::acquire_mutex mutex;
#
fil::log .{ "src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg: Starting up server loop in response to '" + who + "'."; };
pid := wxp::get_process_id ();
running_servers_count
:=
get_count_of_running_pthreads () + 1;
pth::release_mutex mutex;
#
pth::spawn_pthread server_loop;
*running_servers_count;
};
};
end;
## Code by Jeff Prothero: Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


