


## thread-scheduler.pkg
## COPYRIGHT (c) 1989-1991 John H. Reppy
# Compiled by:
# src/lib/std/standard.lib# This module implements the scheduling queues and preemption
# mechanisms.
### "Fill the unforgiving minute with
### sixty seconds worth of distance run"
###
### -- Rudyard Kipling
stipulate
package cpu = cpu_bound_task_pthreads; # cpu_bound_task_pthreads is from src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package frq = set_sigalrm_frequency; # set_sigalrm_frequency is from src/lib/std/src/nj/set-sigalrm-frequency.pkg package io = io_bound_task_pthreads; # io_bound_task_pthreads is from src/lib/std/src/pthread/io-bound-task-pthreads.pkg package iow = io_wait_pthread; # io_wait_pthread is from src/lib/std/src/pthread/io-wait-pthread.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package pth = pthread; # pthread is from src/lib/std/src/pthread.pkg package sig = runtime_signals; # runtime_signals is from src/lib/std/src/nj/runtime-signals.pkg package tim = time; # time is from src/lib/std/time.pkg package tkq = threadkit_queue; # threadkit_queue is from src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg package wxp = winix::process; # winix::process is from src/lib/std/src/posix/winix-process.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
resume_fate = fat::resume_fate;
herein
package thread_scheduler
: (weak) Thread_Scheduler # Thread_Scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.api {
Thread = itt::Thread;
##################################################
# This section contains (partial) support for
# inter-pthread communication per
#
# src/lib/std/src/pthread/template-pthread.pkg #
pid = REF 0; # pid of current process while server is running, otherwise zero.
fun is_running ()
=
(*pid != 0 and *pid == wxp::get_process_id ()); # This way, if the heap gets dumped to disk and then and reloaded, is_running() will
# (correctly) return FALSE, even though pid may not have gotten zeroed.
mutex = pth::make_mutex ();
condvar = pth::make_condvar ();
# One record for each pthread-level
# request supported by the server:
#
Do_Echo = { what: String, reply: String -> Void };
Request = DO_ECHO Do_Echo
;
request_queue = REF ([]: List(Request)); # Queue of pending requests from client pthreads.
fun request_queue_is_empty () # We cannot write just fun request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server pthread.
=
r.reply r.what;
fun echo (request: Do_Echo) # External fn -- will execute in context of client pthread.
=
{
pth::acquire_mutex mutex;
#
request_queue := (DO_ECHO request) ! *request_queue;
#
pth::broadcast_condvar condvar;
#
pth::release_mutex mutex;
};
fun get_new_inter_pthread_requests ()
=
{
pth::acquire_mutex mutex;
#
# for (request_queue_is_empty()) { # DO NOT DELETE THIS CLAUSE!
# # # Eventually we should execute it when both run queues are empty
# pth::wait_on_condvar (condvar, mutex); # (while awaiting new work from worker pthreads).
# };
new_requests = reverse *request_queue; # 'reverse' to restore original request ordering.
#
request_queue := [];
#
pth::release_mutex mutex;
#
new_requests;
};
fun service_inter_pthread_requests []
=>
();
service_inter_pthread_requests (request ! rest)
=>
{ service_inter_pthread_request request;
#
service_inter_pthread_requests rest;
}
where
fun service_inter_pthread_request (DO_ECHO r)
=
do_echo r;
end;
end;
# REMAINING WORK:
# I put
#
# service_inter_pthread_requests (get_new_inter_pthread_requests());
#
# in alarm_handler() because that is the only place in this file
# which can guaranteed a response in a bounded amount of time,
# but we probably also want to opportunistically do the same in
# some of our other thread-switch functions.
#
# ALSO, when both run queues are empty, we should eventually
# block by executing the wait_on_condvar logic in get_new_inter_pthread_requests().
# (Maybe we need separate blocking and nonblocking versions of it?)
#
#
# End of pthread-support section
##################################################
trace_backpatchfn = REF (fn _ = ()) : Ref( (Void -> String) -> Void );
#
# Conditionally write strings to tracing.log or whatever.
# Normally we'd write here
#
# trace = tr::log_if tr::make_logtree_leaf { parent => tr::all_logging, name => "thread_scheduler_logging" };
#
# but that produces a package cycle, so instead we set up a
# refcell with a dummy initial value that we backpatch in
#
# src/lib/src/lib/thread-kit/src/lib/logger.pkg #
fun trace printfn = *trace_backpatchfn printfn; # To debug via tracelogging, annotate the code with lines like
# trace .{ sprintf "foo/top: bar d=%d" bar; };
# Some utility functions that should be inlined
#
fun reverse ([], rl) => rl;
reverse (x ! rest, rl) => reverse (rest, x ! rl);
end;
get_current_thread = unsafe::get_current_thread_register: Void -> Thread; # CALLING THIS (AND USING RETURN VALUE) WHEN THREAD SCHEDULER IS NOT RUNNING CAN SEGV!! XXX BUGGO FIXME
set_current_thread = unsafe::set_current_thread_register: Thread -> Void;
#
# The current thread is represented using a globally allocated register.
# This is a real register on RISC architectures but a memory location
# on the register-starved intel32 architecture -- see current_thread_ptr in src/lib/compiler/back/low/main/intel32/backend-lowhalf-intel32-g.pkg fun bogus _ = raise exception FAIL "should never see this ";
bogus_hook = fat::make_isolated_fate bogus : Fate( Void );
bogus_shutdown_hook = fat::make_isolated_fate bogus : Fate( (Bool, winix::process::Status) );
scheduler_hook = REF bogus_hook;
pause_hook = REF bogus_hook;
shutdown_hook = REF bogus_shutdown_hook;
#
# The scheduler defines three fate "hooks":
#
# scheduler_hook -- This points to a fate that gets dispatched
# when a thread attempts to exit a critical
# section and there is a signal pending.
# It is invoked after re-enabling thread scheduling,
# which is to say, after leaving the critical section.
#
# pause_hook -- This points to a fate that gets invoked when
# there is nothing else to do.
#
# shutdown_hook -- This points to a fate that gets invoked when
# the system is deadlocked, or when
#
# run_threadkit::exit_threadkit
#
# is called. It takes two arguments: the first is a
# boolean flag that says whether to do clean-up and
# the second is the integer exit status ala unix.
# The dummy thread Id.
#
# This is used when an ID is needed
# to get the types right:
#
dummy_thread
=
itt::THREAD
{
name => "thread_scheduler dummy",
thread_id => -1,
did_mail => REF FALSE,
#
exception_handler => REF (fn _ = ()),
#
properties => REF [],
dead => itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET []))
};
# The error thread.
# This thread is used to trap attempts
# to run threadkit without proper initialization
# (i.e., via RunTHREADKIT). This thread is
# enqueued by reset_thread_scheduler.
#
error_thread
=
itt::THREAD
{
name => "thread_scheduler error imp",
thread_id => -2,
did_mail => REF FALSE,
properties => REF [],
dead => itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET [])),
exception_handler => REF (fn _ = ())
};
my error_fate: Fate( Void )
=
fat::make_isolated_fate
(fn _ = { threadkit_debug::say_debug "**** Must do run_threadkit() before using threadkit concurrency support. ****\n";
#
raise exception FAIL "threadkit not initialized";
}
);
# Per-thread mail-activity tracking.
#
# We use this to attempt to distinguish
# I/O bound interactive foreground threads from
# CPU-bound background threads, with the
# idea of increasing system responsiveness
# by giving scheduling priority to foreground
# threads:
#
fun set_mail_flag (itt::THREAD r) = r.did_mail := TRUE;
fun clear_mail_flag (itt::THREAD r) = r.did_mail := FALSE;
fun thread_did_mail (itt::THREAD r) = *r.did_mail;
# The thread ready queues:
#
# foreground_run_queue:
#
# This is for interactive foreground jobs
# needing prompt servicing.
#
# background_run_queue:
#
# This is for CPU-intensive jobs not needing
# quick response.
#
# In practice we consider a job to be 'I/O bound' if
# we see it do a mail op in a given timeslice, otherwise
# we consider it to be CPU bound. We guarantee the
# 'I/O bound' jobs half the available cycles.
#
#
background_run_queue = tkq::make_threadkit_queue () : tkq::Threadkit_Queue( (itt::Thread, Fate(Void)) );
foreground_run_queue = tkq::make_threadkit_queue () : tkq::Threadkit_Queue( (itt::Thread, Fate(Void)) );
stipulate
foreground_run_queue
->
itt::THREADKIT_QUEUE { rear => rear1, ... };
herein
#
fun enqueue p = rear1 := p ! *rear1; # Enqueue a ready thread.
#
fun set_mail_flag_and_enqueue (p as (id, _))
=
{ set_mail_flag id;
#
rear1 := p ! *rear1;
};
end;
enqueue_thread = set_mail_flag_and_enqueue;
#
fun enqueue_and_switch_current_thread (resume, thread) # Enqueue the current thread, and make the given thread ID be the current one.
=
{ set_mail_flag_and_enqueue (get_current_thread(), resume);
#
set_current_thread thread;
};
#
fun dequeue_io_bound_thread () # Dequeue a thread from the queue for 'I/O bound' threads:
=
case foreground_run_queue
#
itt::THREADKIT_QUEUE { front => REF [], rear => REF [] }
=>
dequeue_cpu_bound_thread ();
itt::THREADKIT_QUEUE { front as (REF []), rear as (REF l) }
=>
{ my (head, rest)
=
case (reverse (l, [])) (head ! rest) => (head, rest); # The operative case.
/* */ _ => raise exception MATCH; # Impossible case because 'l' is known to be non-NIL. We include this case merely to suppress the 'nonexhaustive match' warning.
esac;
front := rest;
rear := [];
head;
};
itt::THREADKIT_QUEUE { front as (REF (first ! rest)), ... }
=>
{ front := rest;
first;
};
esac
#
also
fun dequeue_cpu_bound_thread () # Remove a thread from the queue for 'cpu-bound' threads.
= # We assume the 'io-bound' thread queue is empty:
case background_run_queue
#
itt::THREADKIT_QUEUE { front => REF [], rear => REF [] }
=>
(dummy_thread, *pause_hook);
itt::THREADKIT_QUEUE { front as REF [], rear as REF l }
=>
{ rear := [];
front := reverse (l, []);
dequeue_cpu_bound_thread ();
};
itt::THREADKIT_QUEUE { front as REF (first ! rest), ... }
=>
{ front := rest;
first;
};
esac;
#
fun promote_some_background_thread_to_foreground () # Promote a thread from the 'cpu-bound' queue to the 'io-bound' queue.
=
case (tkq::next background_run_queue)
#
THE x => enqueue x;
NULL => ();
esac;
#
Thread_Scheduler_State = MULTI_THREAD | MONO_THREAD | SIGNAL_PENDING; # Global flag for implementing critical sections,
# # which is to say, enabling and disabling thread switching.
thread_scheduler_state = REF MULTI_THREAD;
# Note, the first thing the scheduler hook
# does is a disable_thread_switching,
# so we do not need to clear thread_scheduler_state here:
#
fun dispatch_scheduler_hook ()
=
resume_fate *scheduler_hook ();
# fun enqueueSchedulerHook () = let
# myfate
# =
# call_with_current_fate (fn fate => (
# call_with_current_fate (fn fate' => resume_fate fate fate');
# dispatchSchedulerHook ()))
#
# my itt::THREADKIT_QUEUE { front, ... } = rdyQ1
# in
# front := (dummyTid, myfate) ! *front
# end
#
fun disable_thread_switching ()
=
thread_scheduler_state := MONO_THREAD;
#
fun reenable_thread_switching () # Exit a critical section.
=
case *thread_scheduler_state
#
SIGNAL_PENDING
=>
call_with_current_fate
(
fn fate = { enqueue (get_current_thread(), fate);
#
dispatch_scheduler_hook ();
}
);
_ => thread_scheduler_state := MULTI_THREAD; # "NOTE: There is a race condition between testing *thread_scheduler_state
# above setting of it to MULTI_THREAD here but this is not a problem in
# practice because there are no garbage collector tests between these
# and thus no preemption, due to the runtime system's implementation
# of preemption." -- John H Reppy
esac;
#
fun reenable_thread_switching_and_dispatch_next_thread ()
=
case *thread_scheduler_state
#
SIGNAL_PENDING
=>
dispatch_scheduler_hook ();
_ => { (dequeue_io_bound_thread ())
->
(id, fate);
set_current_thread id;
thread_scheduler_state := MULTI_THREAD; # But what about here? Above comment does not seem to apply. -- 2012-03-20 CrT
resume_fate fate ();
};
esac;
#
fun dispatch_next_thread ()
=
{ disable_thread_switching ();
#
reenable_thread_switching_and_dispatch_next_thread ();
};
#
fun reenable_thread_switching_and_switch_to_thread (tid, fate, x)
=
call_with_current_fate (
fn current_fate
=
{ case *thread_scheduler_state
#
SIGNAL_PENDING
=>
call_with_current_fate
(
fn fate'
=
{ enqueue (tid, fate');
enqueue (get_current_thread(), current_fate);
dispatch_scheduler_hook ();
}
);
_ => { enqueue_and_switch_current_thread (current_fate, tid);
#
thread_scheduler_state := MULTI_THREAD;
};
esac;
resume_fate fate x;
}
);
#
fun reenable_thread_switching_and_yield_to_next_thread
fate
=
{ set_mail_flag_and_enqueue (get_current_thread (), fate);
#
reenable_thread_switching_and_dispatch_next_thread ();
};
# Create a temporary thread (with dummy ID)
# to run the given function and then exit.
# The thread is placed on the front of the
# scheduling queue.
#
fun enqueue_tmp_thread f
=
{
# "We should do this, but the overhead
# is too high right now:" XXX BUGGO FIXME
#
# myfate = fat::make_isolated_fate f
#
# so instead we do this:
#
myfate = call_with_current_fate
(fn fate
=
{ call_with_current_fate (fn fate' = resume_fate fate fate');
#
f ()
except
_ = ();
dispatch_next_thread ();
}
);
foreground_run_queue
->
itt::THREADKIT_QUEUE { front, ... };
front := (dummy_thread, myfate) ! *front;
};
my default_scheduler_hook: Fate( Void )
=
fat::make_isolated_fate
#
dispatch_next_thread;
cached_approximate_time
=
REF (NULL: Null_Or( tim::Time ));
#
# The point of this variable is
# to make the current system time
# available to threads without
# having them all constantly making
# expensive system calls to find out
# the time.
#
# The idea is to cache the current time
# (when known) and just re-use it through
# the end of the current timeslice.
#
# It is cleared to NULL at the end
# of each timeslice.
#
# When get_approximate_time is called,
# it just returns the value of this
# variable if set, otherwise it makes
# the required system call and caches
# the result in this variable before
# returning it.
# Time is from src/lib/std/src/time.api # time is from src/lib/std/time.pkg # time_guts is from src/lib/std/src/time-guts.pkg # Return an approximation of the
# current time of day. This is at
# least as accurate as the time quantum:
#
fun get_approximate_time ()
=
case *cached_approximate_time
#
THE time => time;
#
NULL => { time = tim::get_current_time_utc ();
#
cached_approximate_time
:=
THE time;
time;
};
esac;
#
fun preempt current_fate # preempt the current thread (i.e. 'with fate k).
=
{ current_thread = get_current_thread ();
#
current_p = (current_thread, current_fate);
if (thread_did_mail current_thread)
#
clear_mail_flag current_thread;
promote_some_background_thread_to_foreground ();
enqueue current_p;
else
tkq::enqueue (background_run_queue, current_p);
fi;
};
# This is the function which
# drives timeslicing. It is
# invoked at (typically) 50Hz
# by the posix ALRM signal.
#
# (Note: The runtime does
# not call us when the signal
# is actually received, but
# rather at the next garbage
# collection check point, when
# the state of the heap is
# well defined.)
#
# RETURN VALUE:
# We return the fate ("continuation")
# to be run on completion of handling
# this signal.
# Our 'fate' argument is the code
# that was running when the alarm_handler
# interrupted it, so we can resume the
# interrupted thread simply by returning
# this fate.
# Alternatively we may switch threads
# by returning the fate corresponding to
# some other ready-to-run thread.
#
fun alarm_handler
( _, # Int -- signal number, in this case for SIGALRM.
_, # Int -- count of times SIGALRM has happened since our last call, from c_signal_handler in src/c/machine-dependent/posix-signal.c
current_fate # Fate ("continuation") which was interrupted to run us.
)
=
{
# This is the only place where inter-pthread
# requests are guaranteed to be handled in a
# bounded amount of time:
#
service_inter_pthread_requests (get_new_inter_pthread_requests());
# We keep cached_approximate_time
# accurate to one time quantum.
#
# Since we just started
# a new timeslice we
# clear it -- we no longer
# know what time it is:
#
cached_approximate_time
:=
NULL;
case *thread_scheduler_state
#
MULTI_THREAD
=>
{ # Put current thread to sleep
# and give someone else a chance
# to run:
#
preempt current_fate;
*scheduler_hook; # Invoke scheduler thread, which will then select next user thread to run.
};
MONO_THREAD
=>
{
# We're in a critical section
# (in plain English: thread switching
# is disabled), so we cannot preempt
# the currently running thread.
#
# So instead we make a note to
# preempt the current thread
# as soon as it exits the
# critical section (which is to
# say, re-enables thread switching)
# and then continue running the
# current thread:
#
thread_scheduler_state := SIGNAL_PENDING;
#
current_fate;
};
_ => current_fate;
esac;
};
# By default we time-slice at 20ms (50Hz),
# but the user can change this via the
# run_threadkit time quantum argument:
#
default_time_quantum
=
tim::from_milliseconds 20;
time_quantum
=
REF default_time_quantum;
fun start_supporting_pthreads () # Private to this file; called only from start_thread_scheduler_timer (below).
=
# For now we start these but never stop them, and piggyback
# it on the regular start_thread_scheduler_timer call to
# avoid changing the externally visible api.
#
# This call is designed to be a no-op
# on second and subsequent calls:
{
# For a first try at least, we'll start
# one fewer cycleserver pthreads than we
# have cores (to leave one core available
# for foreground processing) and use the
# the same number of io pthreads (for lack
# of a better idea).
corecount = pth::get_cpu_core_count ();
pthreads_to_start = max (1, corecount - 1); # We need at least one cpu- and io- pthread running!
cpu_pthreads_running = cpu::get_count_of_running_pthreads ();
io_pthreads_running = io::get_count_of_running_pthreads ();
iow::start "thread-scheduler"; # String arg just logs us as party responsible for start-up. This is a no-op if it is already running.
for (i = cpu_pthreads_running;
i < pthreads_to_start;
++i
){
cpu::start "thread-scheduler"; # String arg just logs us as party responsible for start-up.
};
for (i = io_pthreads_running;
i < pthreads_to_start;
++i
){
io::start "thread-scheduler"; # String arg just logs us as party responsible for start-up.
};
};
#
fun start_thread_scheduler_timer new_time_quantum
=
{ start_supporting_pthreads ();
#
new_time_quantum
=
tim::(<) (tim::zero_time, new_time_quantum)
##
?? new_time_quantum
:: default_time_quantum;
time_quantum
:=
new_time_quantum;
sig::set_signal_handler (sig::alarm_signal, sig::HANDLER alarm_handler);
frq::set_sigalrm_frequency (THE new_time_quantum);
();
};
#
fun stop_thread_scheduler_timer ()
=
{ frq::set_sigalrm_frequency NULL;
#
sig::set_signal_handler (sig::alarm_signal, sig::IGNORE);
();
};
#
fun restart_thread_scheduler_timer ()
=
start_thread_scheduler_timer *time_quantum;
# Reset various pieces of state
#
fun reset_thread_scheduler running
=
{ set_current_thread dummy_thread;
#
pause_hook := bogus_hook;
shutdown_hook := bogus_shutdown_hook;
scheduler_hook := default_scheduler_hook;
cached_approximate_time := NULL;
tkq::reset foreground_run_queue;
tkq::reset background_run_queue;
if (not running)
#
enqueue_thread (error_thread, error_fate);
fi;
};
my _ = # "my _ =" because only declarations are syntactically legal here.
reset_thread_scheduler FALSE;
};
end;


