## makelib-thread-boss.pkg
# Compiled by:
#
src/app/makelib/concurrency/makelib-concurrency.sublib# OVERVIEW
# ========
#
# This is a very simple thread package used by
#
#
src/app/makelib/compile/compile-in-dependency-order-g.pkg#
src/app/makelib/mythryl-compiler-compiler/mythryl-compiler-compiler-g.pkg#
# when running multiple copies of the compiler in
# parallel as unix subprocesses so as to save wall-clock
# time when compiling on multi-core machines.
#
# This package does nothing that thread-kit doesn't do better;
# it is here only so we can do parallel compiles even when
# thread-kit isn't installed.
#
#
#
# NB: Throughout this file, 'thread' refers to lightweight
# application-specific threads, not heavyweight threads
# managed by the OS kernel. In other words, we are here
# concerned at the in-process level with multiprogramming,
# not multiprocessing. (Our unix subprocesses do give
# us multiprocessing, of course.)
#
#
#
# We create threads using a 'make_makelib_thread' command
# which takes a function to compute and returns a 'thread'
# object to the compiler:
#
# thread = sit::make_makelib_thread {. something_to_compute (); };
#
# We also supply a 'wait_for_thread_to_finish_then_return_result' function which may be applied
# to such thread objects, and which yields the final result
# computed by the thread:
#
# result = sit::wait_for_thread_to_finish_then_return_result my_thread;
#
# Calling 'wait_for_thread_to_finish_then_return_result' on a thread T which has not yet
# completed its computation blocks the caller until T terminates.
#
# Thus, a thread exists in one of two states:
#
# o RUNNING, when it is associated with a queue of blocked
# threads waiting to read its result value, and
#
# o DONE, when it is associated with its return value.
#
#
#
#
# Virtual Threads
# ---------------
#
# "Virtual threads" essentially make our thread wait queues
# available without the bother of having an actual thread.
#
# A virtual thread is created via
#
# my_virtual_thread = sit::spawn_virtual_thread ();
#
# Note that no actual function to compute is specified,
# and none ever exists. But threads which do
#
# sit::wait_for_thread_to_finish_then_return_result my_virtual_thread;
#
# block just as though it was a regular running thread.
#
# Since there is no actual code running, by default these
# threads would block forever. Consequently, we provide a
#
# sit::terminate_virtual_thread
# #
# my_virtual_thread;
#
# call which simulates termination of the virtual thread
# and unblocks all threads waiting on its result
# (which is always Void).
#
# Thus, virtual threads facilitate simple kinds of
# manual thread scheduling without having to sink
# all the way to the callcc level.
#
#
#
#
# Stream Proxy Threads
# --------------------
#
# Since the main purpose of this package is to support
# compiler interaction with multiple compiler subprocesses,
# we need to have have a way for a thread to block
# until input becomes available from a given unix pipe.
# Stream proxy threads answer this need.
#
# A stream proxy thread is created by doing
#
# my_proxy_thread
# =
# sit::make_unix_pipe_input_wait_queue
# #
# (pipe: file::Input_Stream);
#
# where the stream in question is a pipe from
# a fork()ed unix subprocess. Any thread which
# reads the "result" of such a proxy thread via
#
# sit::wait_for_thread_to_finish_then_return_result my_proxy_thread;
#
# will then block until input is available from
# that unix subprocess.
#
# Underneath, this is implemented using a unix
# select() / poll() call; stream proxy threads
# are essentially a graceful way providing
# thread access to select() / poll() functionality.
#
#
#
# Priorities
# ----------
#
# To provide some minimal control over scheduling
# of runnable threads, we allow threads to specify
# integer priorities by calling
#
# sit::wait_for_thread_to_finish_then_return_result_running_at_priority some_priority some_thread;
#
# instead of just
#
# sit::wait_for_thread_to_finish_then_return_result some_thread;
#
# priority. When multiple threads are ready to run,
# the thread with the highest priority is selected:
#
# low_priority = 1;
# high_priority = 999;
#
# starting_gun = sit::spawn_virtual_thread ();
#
# goes_first = sit::make_thread {. sit::wait_for_thread_to_finish_then_return_result_running_at_priority high_priority starting_gun; print "I went first!\n"; };
# goes_second = sit::make_thread {. sit::wait_for_thread_to_finish_then_return_result_running_at_priority low_priority starting_gun; print "I went second.\n"; };
#
# sit::terminate_virtual_thread starting_gun;
#
# The above will result in 'goes_first' and 'goes_second'
# executing in the obvious order.
# IMPLEMENTATION NOTES
# ====================
#
# Since the point of this package is to provide an
# extremely lightweight alternative to the full
# thread-kit package, we keep things simple by
# providing bare-minumum functionality:
#
# o We do no pre-emption. The only way
# for a thread to give up control of
# the processor is to terminate or
# call sit::wait_for_thread_to_finish_then_return_result on another thread.
#
# o We do select() / poll() calls to check
# for input from subprocesses only when
# sit::wait_for_thread_to_finish_then_return_result is called
# and there are no runnable threads.
#
# o For a priority queue we use a simple
# list maintained using O(N**2) insertion
# sort.
#
# These simplifications work fine for
#
#
src/app/makelib/compile/compile-in-dependency-order-g.pkg#
src/app/makelib/mythryl-compiler-compiler/mythryl-compiler-compiler-g.pkg#
# which are written with them in mind, but may
# easily cause problems in general. For example,
# spewing commands to the subprocesses without
# pausing to read return results from them will
# quickly produce deadlock.
#
# So, in general -- use thread-kit. :)
# "Too much work and too much energy
# kill a man just as effectively as
# too much assorted vice or too much drink."
#
# -- Rudyard Kipling
stipulate
package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package wio = winix__premicrothread::io; # winix__premicrothread::io is from
src/lib/std/src/posix/winix-io--premicrothread.pkgherein
api Makelib_Thread_Boss {
#
Makelib_Thread(X); # Externally visible type of a thread.
# X is the thread result type.
Makelib_Thread_Boss;
make_makelib_thread_boss: Void -> Makelib_Thread_Boss;
make_makelib_thread
:
Makelib_Thread_Boss
->
(Void -> X) # Code to run in thread.
->
Makelib_Thread(X);
wait_for_thread_to_finish_then_return_result: Makelib_Thread_Boss -> Makelib_Thread(X) -> X; # Waits, then resumes running at minimum priority.
wait_for_thread_to_finish_then_return_result_running_at_priority: Makelib_Thread_Boss -> Int -> Makelib_Thread(X) -> X; # Waits, then resumes running at given priority.
#
# Priority when using wait_for_thread_to_finish_then_return_result_running_at_priority
# is always higher than when using wait_for_thread_to_finish_then_return_result
read_line_from_unix_pipe # While waiting this will yield the process, allowing other Makelib_Threads to run.
:
Makelib_Thread_Boss
-> fil::Input_Stream
-> Null_Or( String );
make_wait_queue # Create a "thread" which will (only) "terminate" when run_all_threads_in_wait_queue is called.
: #
Makelib_Thread_Boss -> Makelib_Thread( Void );
run_all_threads_in_wait_queue # Move to the run queue all threads in wait queue.
: #
Makelib_Thread_Boss
-> Makelib_Thread( Void )
-> Void;
reset_thread_manager: Makelib_Thread_Boss -> Void; # Forget all threads and threads waiting on them.
no_runnable_threads: Makelib_Thread_Boss -> Bool; # Check whether there are any (other) runnable threads.
get_cores_in_use: Makelib_Thread_Boss -> Int;
set_cores_in_use: (Makelib_Thread_Boss, Int) -> Void;
};
end;
stipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package pur = file__premicrothread::pur; # file__premicrothread::pur is from
src/lib/std/src/io/winix-text-file-for-os-g--premicrothread.pkg package tbi = winix_base_text_file_io_driver_for_posix__premicrothread; # winix_base_text_file_io_driver_for_posix__premicrothread is from
src/lib/std/src/io/winix-base-text-file-io-driver-for-posix--premicrothread.pkg package wio = winix__premicrothread::io; # winix__premicrothread::io is from
src/lib/std/src/posix/winix-io--premicrothread.pkg package wnx = winix__premicrothread; # winix__premicrothread is from
src/lib/std/winix--premicrothread.pkgherein
package makelib_thread_boss
: Makelib_Thread_Boss
{
# Type for (thread, priority) pairs in our priority queues:
#
Priority_Queue_Entry
=
(fat::Fate( Void ), Int); # (Thread, Priority)
Thread_Vim(X)
#
= DONE(X) # Value of thread.
| RUNNING List( Priority_Queue_Entry )
# Threads waiting for thread value to be computed.
;
#
# Type to hold thread state.
# State is initially RUNNING, carrying a list of
# threads waiting for the result of this thread.
# Once the thread result value is known,
# we change the state to DONE(result), and move all
# the previously waiting threads to the run queue:
Makelib_Thread(X)
=
Ref( Thread_Vim(X) );
Thread_Priority_Queue # Type for simple, brain-dead priority queue. Entries are (thread, priority) pairs.
=
Ref( List( Priority_Queue_Entry ) );
Makelib_Thread_Boss # Encapsulates the state for this package. SML/NJ keeps it all in global variables, we keep it in Makelib_State from
src/app/makelib/main/makelib-state.pkg =
{ runnable_threads_priority_queue: Thread_Priority_Queue,
#
unix_pipes_to_watch: Ref( List( ( Makelib_Thread( Void ), wio::Ioplea ) ) ),
#
cores_in_use: Ref( Int ),
#
core_wait_queue: Ref( Thread_Vim( Void ) )
};
fun make_makelib_thread_boss (): Makelib_Thread_Boss
=
{ runnable_threads_priority_queue => (REF []): Thread_Priority_Queue,
unix_pipes_to_watch => REF ([]: List( ( Makelib_Thread( Void ), wio::Ioplea ) ) ),
cores_in_use => REF 0, # We use this to avoid running more compile child-processes than cores in
src/app/makelib/compile/compile-in-dependency-order-g.pkg core_wait_queue => REF (RUNNING []) # We use this to wait for an available core in
src/app/makelib/compile/compile-in-dependency-order-g.pkg };
fun get_cores_in_use (boss: Makelib_Thread_Boss) = *boss.cores_in_use;
fun set_cores_in_use (boss: Makelib_Thread_Boss, i: Int) = boss.cores_in_use := i;
stipulate
fun enqueue (qref as REF queue, x as (_, x_priority))
=
# Insert (value, priority) pair x
# into priority queue 'qr' via side-effect,
# keeping the latter sorted by priority.
#
qref := insert queue
where
fun insert []
=>
[x];
insert ((y as (_, y_priority)) ! rest) # This results in an idiot O(N**2) insertion sort. Why not just use a red-black-tree or priority queue? -- 2011-09-21 CrT
=>
if (x_priority >= y_priority) x ! y ! rest;
else y ! insert rest;
fi;
#
# NB: The ">=" is important here. If we had used ">" then
# the code in mythryl-compiler-compiler-g.pkg would not
# perform as desired. In particular, the parser
# thread would end up being scheduled first,
# effectively preventing the "cmb" message
# from being sent to the server processes.
# (With preemption this would not be a problem.)
end;
end;
# Pop first entry off priority queue
# via side-effect, and return the
# popped entry:
#
fun dequeue (REF [])
=>
NULL;
dequeue (queue_ref as REF (first ! rest))
=>
{ queue_ref := rest;
#
THE first;
};
end;
# A thread has terminated, returning 'result'.
#
# The thread record 'r' holds the list of threads
# waiting for this thread to terminate.
#
# Change the thread state from RUNNING to DONE (result)
# via side-effect, and move the waiting threads
# to the run queue:
#
fun handle_thread_termination (boss: Makelib_Thread_Boss) (thread_state as REF (RUNNING waiting_threads), result)
=>
{ thread_state := DONE result;
#
apply'
waiting_threads
(\\ waiting_thread = enqueue (boss.runnable_threads_priority_queue, waiting_thread));
};
handle_thread_termination boss (REF (DONE _), _)
=>
{ fil::say {. "thread terminated twice!"; };
#
raise exception DIE "thread";
};
end;
#
fun select_on_input_file_descriptors boss
=
case *boss.unix_pipes_to_watch
#
[] => { fil::say {. "deadlock!"; };
#
raise exception DIE "thread";
};
subprocesses
=>
{ # Get a poll list of unix descriptors
# corresponding to our unix subprocesses:
#
wait_requests
=
map #2 subprocesses;
# Do a Unix-level poll on our child-pid list.
# Since there is nothing else to do (we are only
# called if 'runnable_threads_priority_queue' is empty),
# we can afford to block at the unix process level
# until at least one child has exited:
#
printf "src/app/makelib/concurrency/makelib-thread-boss.pkg: calling wio::wait_for_io_opportunity\n";
wait_results
=
wio::wait_for_io_opportunity { wait_requests, timeout => NULL };
printf "src/app/makelib/concurrency/makelib-thread-boss.pkg: back from wio::wait_for_io_opportunity\n";
# Partition our input list into two
# lists not/ready of those which have/not
# completed execution:
#
fun is_ready (_, ioplea: wio::Ioplea)
=
{ fun same_io_descriptor (poll_result: wio::Ioplea_Result)
=
wio::compare
(
ioplea.io_descriptor,
poll_result.io_descriptor
)
==
EQUAL;
list::exists
same_io_descriptor
wait_results;
};
(list::partition is_ready subprocesses)
->
(ready, not_ready);
# Schedule the 'not_ready' pids
# to be checked again next time around:
#
boss.unix_pipes_to_watch
:=
not_ready;
# For each 'ready' list entry (which represents
# a unix subprocess pipe stream which now has
# output available for us to read) mark the matching
# proxy thread as terminated, which moves to the
# run queue any threads waiting to read its result:
#
apply
(\\ (proxy_thread, _) = handle_thread_termination boss (proxy_thread, ()))
ready;
# There should now be a ready-to-run
# thread in the run queue, since:
#
# (1) The above poll doesn't return until a
# subprocess has something to read;
#
# (2) We then set the thread corresponding
# to that subprocess to DONE, which moves all
# threads waiting on it to the run queue;
#
# (3) There should be at least one such thread
# -- that which spawned that unix subprocess.
#
# So -- switch to the highest-priority thread in
# runnable_threads_priority_queue:
#
case (dequeue boss.runnable_threads_priority_queue)
#
NULL =>
{
fil::say {. "src/app/makelib/concurrency/makelib-thread-boss.pkg: select_on_input_file_descriptors failed to wake anybody up!"; };
#
raise exception DIE "thread";
};
THE (thread_state, _)
=>
{
fat::switch_to_fate thread_state (); #
};
esac;
};
esac; # fun select_on_input_file_descriptors ()
fun run_highest_priority_runnable_thread_else_select_on_input_file_descriptors boss
=
# Pick next thread to run, and run it.
# If we have more than one local ready-to-run
# thread, we run the highest-priority one of them.
# Otherwise, we wait for input from one of our
# forked unix subprocesses:
#
case (dequeue boss.runnable_threads_priority_queue)
#
THE (thread_state, _)
=>
{
fat::switch_to_fate thread_state (); # Run local fate.
};
NULL =>
{
select_on_input_file_descriptors boss; # Wait for Unix-level input.
};
esac;
herein
#
fun reset_thread_manager (boss: Makelib_Thread_Boss) # Reset all state managed by this package.
=
{ boss.runnable_threads_priority_queue := [];
boss.unix_pipes_to_watch := [];
};
#
fun no_runnable_threads (boss: Makelib_Thread_Boss)
=
list::null *boss.runnable_threads_priority_queue;
#
fun make_wait_queue boss
=
(REF (RUNNING [])): Makelib_Thread( Void );
#
fun run_all_threads_in_wait_queue boss (REF (DONE ()))
=>
();
run_all_threads_in_wait_queue boss waiting_threads
=>
handle_thread_termination boss (waiting_threads, ());
end;
stipulate
# Read thread result, with given priority.
#
# This suspends execution of the current
# thread until target thread finishes.
#
# A thread's priority has no effect while
# it is waiting, but becomes its scheduling
# priority once it is ready to run -- when
# a thread with multiple waiting threads
# is fixed, the highest-priority thread
# runs first.
#
# When we do finally run again, the value
# of the thread becomes the return value
# of the result call:
#
fun wait_for_thread_to_finish_then_return_result_running_at_priority' boss _ (REF (DONE result))
=>
# Thread is DONE, just return its result:
#
result;
wait_for_thread_to_finish_then_return_result_running_at_priority' boss priority (thread_state as REF (RUNNING thread_state_list))
=>
# Thread is not done: Add ourself to thread's wait
# queue and schedule some other thread to run:
#
{
fat::call_with_current_fate
#
(\\ current_thread
=
{
thread_state := RUNNING ((current_thread, priority) ! thread_state_list);
#
run_highest_priority_runnable_thread_else_select_on_input_file_descriptors boss;
}
);
wait_for_thread_to_finish_then_return_result_running_at_priority' boss priority thread_state;
};
end;
herein
#
fun wait_for_thread_to_finish_then_return_result boss thread
=
wait_for_thread_to_finish_then_return_result_running_at_priority' boss 0 thread;
fun wait_for_thread_to_finish_then_return_result_running_at_priority boss priority thread
=
wait_for_thread_to_finish_then_return_result_running_at_priority' boss (priority + 1) thread;
end;
# Fire off an internal thread.
#
# 'thunk_for_thread_to_run' is the thunk
# to be evaluated by the new thread.
#
# Return the matching Thread.
#
fun make_makelib_thread boss thunk_for_thread_to_run
=
{ new_thread = REF (RUNNING []); # The value we return to caller.
# We capture two fates here in succession:
# o 'return_fate', which is the thread which will return from the 'make_thread' call;
# o 'thread_fate' which is the make_makelib_thread()ed thread, which will
# 1. Evaluate thunk_for_thread_to_run ()
# 2. Store thunk_for_thread_to_run()'s result into the returned condition 'c',
# waking any threads waiting on it.
# 3. Terminate the thread by calling
# run_highest_priority_runnable_thread_else_select_on_input_file_descriptors boss.
# We put 'thread_fate' in the run queue and then
# have 'make_thread' return 'thread':
#
fat::call_with_current_fate
#
(\\ return_fate
=
{ fat::call_with_current_fate
#
(\\ thread_fate
=
{ enqueue (boss.runnable_threads_priority_queue, (thread_fate, -1));
#
fat::switch_to_fate return_fate new_thread; #
}
);
handle_thread_termination boss (new_thread, thunk_for_thread_to_run ());
run_highest_priority_runnable_thread_else_select_on_input_file_descriptors boss;
}
);
};
stipulate
fun make_unix_pipe_input_wait_queue
#
(boss: Makelib_Thread_Boss)
#
(input_stream: fil::Input_Stream)
=
# Construct and return a "thread"
# which will "terminate" when the
# given (unix pipe) input stream
# has data to be read.
#
# As a side effect, we add the returned
# thread plus the Unix poll descriptor
# to our unix_pipes_to_watch.
#
# We implement the required functionality by
# polling this list for pending input whenever
# we have nothing else to do:
#
{ reader_and_vector
=
pur::get_reader (fil::get_instream input_stream);
thread
=
case reader_and_vector
#
(tbi::FILEREADER { io_descriptor => THE io_descriptor, ... }, "")
=>
{ thread = REF (RUNNING []);
#
request = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
};
boss.unix_pipes_to_watch
:=
(thread, request) ! *boss.unix_pipes_to_watch;
thread;
};
(_, "") => { fil::say {. "make_unix_pipe_input_wait_queue: bad stream"; };
#
raise exception DIE "concur";
};
(_, _) => REF (DONE ());
esac;
fil::set_instream # Sets input_stream to point to a new REF-cell holding same reader_and_vector as before.
(
input_stream,
pur::make_instream reader_and_vector # This allocates a new REF-cell holding reader_and_vector.
);
thread;
}; # fun make_unix_pipe_input_wait_queue
herein
fun read_line_from_unix_pipe
#
(boss: Makelib_Thread_Boss)
#
(pipe: fil::Input_Stream)
#
: Null_Or( String )
=
{ # Yield the process while we wait for input -- this
# allows other Makelib_Threads to run in the meantime:
#
wait_queue = (make_unix_pipe_input_wait_queue boss pipe);
wait_for_thread_to_finish_then_return_result
#
boss
#
wait_queue;
fil::read_line pipe;
};
end; # stipulate
end; # stipulate
}; # package makelib_thread_boss
end; # stipulate
## (C) 1999 Lucent Technologies, Bell Laboratories
## Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.