## microthread-preemptive-scheduler.pkg # Derives from cml/src/core-cml/scheduler.sml
#
# Preemptive timeslicing of microthreads.
#
#
# Nomenclature
# ============
#
# It is critically important to track uninterruptible
# scopes (== dynamic scopes where switching threads is
# forbidden, implemented by nonzero value of uninterruptible_scope_mutex)
# so we adopt the following convention for function name suffixes:
#
# __eu # ("eu" == "enter uninterruptible scope") This function begins uninterruptible scope -- i.e., increments *uninterruptible_scope_mutex.
# __iu # ("iu" == "in uninterruptible scope") Fn must be called in uninterruptible scope -- i.e., when *uninterruptible_scope_mutex is nonzero.
# __xu # ("xu" == "exit uninterruptible scope") This function exits uninterruptible scope -- i.e., decrements *uninterruptible_scope_mutex.
# Compiled by:
#
src/lib/std/standard.lib### "Fill the unforgiving minute with
### sixty seconds worth of distance run"
###
### -- Rudyard Kipling
stipulate
package at = run_at__premicrothread; # run_at__premicrothread is from
src/lib/std/src/nj/run-at--premicrothread.pkg package cpu = cpu_bound_task_hostthreads; # cpu_bound_task_hostthreads is from
src/lib/std/src/hostthread/cpu-bound-task-hostthreads.pkg package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package frq = set_sigalrm_frequency; # set_sigalrm_frequency is from
src/lib/std/src/nj/set-sigalrm-frequency.pkg package hth = hostthread; # hostthread is from
src/lib/std/src/hostthread.pkg package io = io_bound_task_hostthreads; # io_bound_task_hostthreads is from
src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg package iow = io_wait_hostthread; # io_wait_hostthread is from
src/lib/std/src/hostthread/io-wait-hostthread.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg# package mop = mailop; # mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package is = interprocess_signals; # interprocess_signals is from
src/lib/std/src/nj/interprocess-signals.pkg package thd = threadkit_debug; # threadkit_debug is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-debug.pkg package tim = time; # time is from
src/lib/std/time.pkg package rwq = rw_queue; # rw_queue is from
src/lib/src/rw-queue.pkg package wxp = winix__premicrothread::process; # winix__premicrothread::process is from
src/lib/std/src/posix/winix-process--premicrothread.pkg #
nb = log::note_on_stderr; # log is from
src/lib/std/src/log.pkgFate(X) = fat::Fate(X);
Microthread = itt::Microthread;
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
microthread_scheduler_hostthread_id = 0; # Keep synched with MICROTHREAD_SCHEDULER_HOSTTHREAD_ID in src/c/h/runtime-base.h
herein
package microthread_preemptive_scheduler
: (weak) Microthread_Preemptive_Scheduler # Microthread_Preemptive_Scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.api {
uninterruptible_scope_mutex = log::uninterruptible_scope_mutex; # Putting the mutex in log allows its value to be printed from virtually anywhere -- handy during debugging.
disable_debug_ramlogging = REF FALSE;
scheduler_hostthread = REF (hth::get_hostthread ()); # We publish this so that other hostthreads can wake us out of an is::pause() via pthread_kill(hostthread,SIGUSR1);
#
alarm_handler_calls = REF 0;
alarm_handler_calls_with__uninterruptible_scope_mutex__set = REF 0;
alarm_handler_calls_with__microthread_switch_lock__set = REF 0;
trace_backpatchfn = REF (\\ _ = ()) : Ref( (Void -> String) -> Void ); # Exported.
#
# Conditionally write strings to tracing.log or whatever.
# Normally we'd write here
#
# trace = tr::log_if tr::make_logtree_leaf 0 { parent => tr::all_logging, name => "thread_scheduler_logging", default => FALSE };
#
# but that produces a package cycle, so instead we set up a
# refcell with a dummy initial value that we backpatch in
#
#
src/lib/src/lib/thread-kit/src/lib/logger.pkg #
fun trace printfn = *trace_backpatchfn printfn; # To debug via tracelogging, annotate the code with lines like
# trace {. sprintf "foo/top: bar d=%d" bar; };
# If we have nothing to do we do is::pause() which executes a Posix-level pause() command
# which puts us to sleep until a Posix-level signal wakes up. Consequently if
#
src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg # hands us a mouseclick from the X server read from a socket, we will not respond until
# the next 50HZ SIGALRM wakes us, which can be 20ms -- forever in computer terms.
# This fn preemptively sends us a SIGUSR1 "immediately" (modulo Linux kernel response overhead)
# thus allowing us to begin processing the mouseclick (or whatever) in microseconds instead
fun wake_scheduler_hostthread_if_paused () # of milliseconds.
=
{ sigusr1_as_int = is::signal_to_int is::SIGUSR1;
#
hth::signal_hostthread (*scheduler_hostthread, sigusr1_as_int);
};
# Some utility functions that should be inlined
#
fun reverse ( [], rl) => rl;
reverse (x ! rest, rl) => reverse (rest, x ! rl);
end;
get_current_microthread = unsafe::get_current_microthread_register: Void -> Microthread; # Exported. CALLING THIS (AND USING RETURN VALUE) WHEN THREAD SCHEDULER IS NOT RUNNING CAN SEGV!! XXX BUGGO FIXME
set_current_microthread = unsafe::set_current_microthread_register: Microthread -> Void; # Exported.
#
# The current thread is represented using a globally allocated register.
# This is a real register on RISC architectures but a memory location
# on the register-starved intel32 architecture -- see current_thread_ptr in
src/lib/compiler/back/low/main/intel32/backend-lowhalf-intel32-g.pkg #
fun bogus _
=
{
log::note_on_stderr {. "bogus() called, raising exception DIE -- thread-scheduler\n"; };
raise exception DIE "should never see this ";
};
bogus_void_fate = fat::make_isolated_fate bogus : Fate( Void );
bogus_shutdown_fate = fat::make_isolated_fate bogus : Fate( (Bool, wxp::Status) );
######################################################################################################################################################################################
# The scheduler defines three fate "hooks":
#
thread_scheduler_shutdown_hook = REF bogus_shutdown_fate; # Exported.
run_next_runnable_thread__xu__hook = REF bogus_void_fate; # Exported.
no_runnable_threads_left__hook = REF bogus_void_fate; # Exported.
#
#
# thread_scheduler_shutdown_hook
# points to a fate that gets invoked when
# the system is deadlocked, by
# no_runnable_threads_left__fate set up by wrap_for_export from
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg # or when one of
# exit or exit_uncleanly from
src/lib/std/src/posix/winix-process.pkg # shut_down_thread_scheduler from
src/lib/src/lib/thread-kit/src/posix/thread-scheduler-control.pkg # is called (typically at the end of a threadkit app).
#
# It takes two arguments:
# o A boolean flag that says whether to do clean-up.
# o The integer exit status ala unix.
#
# This hook gets set mainly in start_up_thread_scheduler'' in
src/lib/src/lib/thread-kit/src/glue/thread-scheduler-control-g.pkg # but also in wrap_for_export in
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg #
# run_next_runnable_thread__xu__hook
# points to a fate that gets dispatched
# when a thread attempts to exit a critical
# section and there is a signal pending.
#
# It is invoked after re-enabling thread scheduling,
# which is to say, after leaving the critical section.
#
# reset_thread_scheduler sets this to default_scheduler_fate (== dispatch_next_thread__noreturn).
# This is set to wake_sleeping_threads_and_schedule_fd_io_and_harvest_dead_subprocesses__xu__fate (see below)
# by start_up_thread_scheduler'' in
src/lib/src/lib/thread-kit/src/glue/thread-scheduler-control-g.pkg # and wrap_for_export in
src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg # Later this will get set to
# wake_sleeping_threads_and_schedule_fd_io_and_harvest_dead_subprocesses__xu__fate from
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg # which mostly wraps
# wake_sleeping_threads_and_schedule_fd_io_and_harvest_dead_subprocesses from
src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg #
#
# no_runnable_threads_left__hook
# points to a fate that gets invoked when
# there is nothing else to do.
#
# Later this will get set to no_runnable_threads_left__fate from
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg # by start_up_thread_scheduler'' in
src/lib/src/lib/thread-kit/src/glue/thread-scheduler-control-g.pkg ##################################################
# This section contains data structures for
# inter-hostthread communication per
#
#
src/lib/std/src/hostthread/template-hostthread.pkg #
pid = REF 0; # Unix process id of current process while server is running, otherwise zero.
#
# One record for each hostthread-level
# request supported by the server:
#
Do_Echo = { what: String, reply: String -> Void };
Request = DO_ECHO Do_Echo
| DO_THUNK (Void -> Void)
;
request_queue = REF ([]: List(Request)); # Queue of pending requests from client hostthreads.
#
fun inter_hostthread_request_queue_is_empty () # We cannot write just fun inter_hostthread_request_queue_is_empty () = (*request_queue == []);
= # because Request is not an equality type. (The 'reply' fields are functions
case *request_queue [] => TRUE; # and Mythryl does not support comparison of functions for equality.)
_ => FALSE;
esac;
# Nomenclature: What I'm calling "uninterruptible mode" is usually called "critical section" or "atomic region"
# in the literature. I dislike "critical" because it is vague. ("critical" in what sense? Who knows?)
# "atomic" is literally correct ("a-tomic" == "not cuttable" -- indivisible) but the modern reader is not
# likely to take it in that sense at first blush. And neither "section" nor "region" nor "scope" is as apropos as "mode".
#
# uninterruptible_scope_mutex = REF 0; # Iff this counter > 0 then thread scheduler is in "uninterruptible mode" (aka "critical section", "atomic region" ...).
need_to_switch_threads_when_exiting_uninterruptible_scope = REF FALSE;
# Following is a little hack that allows us to call
# add_inter_hostthread_request_handler_thunks_to_run_queue' ()
# from most anywhere in the file without the
# whole file collapsing into mutual recursion:
#
add_inter_hostthread_request_handler_thunks_to_run_queue__hook # This hook gets set to add_inter_hostthread_request_handler_thunks_to_run_queue' at bottom of file.
=
REF (NULL: Null_Or( Void -> Void ));
#
fun if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue ()
=
case *request_queue
#
[] => ();
#
_ =>
case *add_inter_hostthread_request_handler_thunks_to_run_queue__hook
#
THE add_inter_hostthread_request_handler_thunks_to_run_queue' => {
add_inter_hostthread_request_handler_thunks_to_run_queue' ();
};
NULL => (); # Shouldn't happen; shouldn't matter if it does -- we'll just run the request a little later.
esac;
esac;
#
#
##################################################
#
fun set_didmail_flag (itt::MICROTHREAD thread) = thread.didmail := TRUE;
fun clear_didmail_flag (itt::MICROTHREAD thread) = thread.didmail := FALSE;
fun thread_did_mail (itt::MICROTHREAD thread) = *thread.didmail;
#
# Per-thread mail-activity tracking.
#
# We use this to attempt to distinguish
# mail-bound interactive foreground threads from
# CPU-bound background threads, with the
# idea of increasing system responsiveness
# by giving scheduling priority to foreground
# threads.
background_run_queue = rwq::make_rw_queue () : rwq::Rw_Queue( (itt::Microthread, Fate(Void)) );
foreground_run_queue = rwq::make_rw_queue () : rwq::Rw_Queue( (itt::Microthread, Fate(Void)) ); # Exported.
#
# The thread ready queues:
#
# foreground_run_queue:
#
# This is for interactive foreground jobs
# needing prompt servicing. (Mail-bound jobs.)
#
# background_run_queue:
#
# This is for CPU-intensive jobs not needing
# quick response.
#
# In practice we consider a job to be 'mailbound' if
# we see it do a mailop in a given timeslice, otherwise
# we consider it to be CPU bound. We guarantee the
# 'mailbound' jobs half the available cycles.
#########################################################################
# Debug support
# =====================
#
fun thread_scheduler_statestring ()
=
# Construct and return a string summarizing the state
# of the thread scheduler, looking something like:
#
# "{ 1 3:1 fg_q=[5:1
|6:1] bg_q[8:2 9:2|] schwantzstucker }"
#
# which decodes as:
#
# 1 uninterruptible_scope_mutex is 1 -- we're in a "critical section".
# 3:1 Currently running microthread has thread_id=3 and belongs to task with task_id=1
# fg_q=[5:1
|6:1] The foreground run queue has thread
#5 (of task 1) in front half and thread #6 in back half.
# bg_q=[8:2 9:2
|] The background run queue has threads
#8 and #9 in the front half and nothing in back half.
# schwantzstucker Name of currently running microthread.
#
# This string is expected to be used primarily in the construction of
# messages to be logged via log::note() or logger::log_if() or such.
{
sprintf "{ %d %-5s fg_q=[%s] bg_q=[%s] }" # We don't include name of current-thread here because log::note already records it.
*uninterruptible_scope_mutex
(sprint_self())
(sprint_runqueue foreground_run_queue)
(sprint_runqueue background_run_queue);
}
where
fun sprint_thread thread
=
{ thread -> itt::MICROTHREAD { thread_id, task, ... };
task -> itt::APPTASK { task_name, task_id, ... };
#
sprintf "%d:%d" thread_id task_id;
};
#
fun sprint_self ()
=
sprint_thread (get_current_microthread());
#
fun sprint_runqueue q
=
{ (string::join " " (map sprint_q_entry (rwq::frontq q)))
+
"
|"
+
(string::join " " (map sprint_q_entry (rwq::backq q)));
}
where
fun sprint_q_entry q_entry
=
{ q_entry -> (thread, fate);
sprint_thread thread;
};
end;
end;
my _ =
log::thread_scheduler_statestring__hook := thread_scheduler_statestring;
#
#########################################################################
#########################################################################
# Low-garbage debug support
# =========================
# The point of the following routines is/was to print debugging info to stdout
# while generating a minimum of garbage, for use in parts of the thread scheduler
# where an untimely garbage collection might trigger an untimely thread switch
# resulting in unanticipated new problems -- it is depressing when the debug
# logic produces even more buggy behavior.
#
fun print_int width i # Re-inventing int::to_string here to reduce amount of garbage generated.
=
{
if (i < 10)
#
for (j = width; j > 1; --j) { fil::print " "; };
#
if (i == 0) fil::print "0";
elif (i == 1) fil::print "1";
elif (i == 2) fil::print "2";
elif (i == 3) fil::print "3";
elif (i == 4) fil::print "4";
elif (i == 5) fil::print "5";
elif (i == 6) fil::print "6";
elif (i == 7) fil::print "7";
elif (i == 8) fil::print "8";
else fil::print "9";
fi;
else
print_int (width - 1) (i / 10);
print_int 0 (i % 10);
fi;
};
#
fun print_thread_scheduler_state ()
=
{ fil::print "{ "; print_int 1 *uninterruptible_scope_mutex;
fil::print " running="; print_self ();
fil::print " fg_q=["; print_runqueue foreground_run_queue; fil::print "]";
fil::print " bg_q=["; print_runqueue background_run_queue; fil::print "]";
fil::print " }";
}
where
fun print_thread thread
=
{
thread -> itt::MICROTHREAD { name, thread_id, task, ... };
task -> itt::APPTASK { task_name, task_id, ... };
print_int 2 thread_id; fil::print ":"; print_int 1 task_id;
fil::print " ";
};
#
fun print_thread' thread
=
{
thread -> itt::MICROTHREAD { name, thread_id, task, ... };
task -> itt::APPTASK { task_name, task_id, ... };
print_int 2 thread_id; fil::print ":"; print_int 1 task_id;
fil::print "(";
fil::print name;
fil::print ") ";
for (i = 60 - string::length_in_bytes name; i > 0; --i) { fil::print " "; };
};
#
fun print_self ()
=
print_thread' (get_current_microthread());
#
fun print_runqueue q
=
{ fun print_entry q_entry
=
{ q_entry -> (thread, fate);
print_thread thread;
};
frontq = rwq::frontq q;
backq = rwq::backq q;
#
apply' frontq print_entry;
fil::print "
|";
apply' backq print_entry;
};
end;
#
#########################################################################
# Set a condition variable.
# Caller guarantees that this function is always
# executed in an uninterruptible scope.
#
fun set_condvar__iu (itt::CONDITION_VARIABLE state) # Only external call is in
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg =
case *state
#
itt::CONDVAR_IS_NOT_SET waiting_threads # waiting_threads is the list of threads sitting
=> # blocked waiting for this condvar to be set.
{ foreground_run_queue -> rwq::RW_QUEUE { back, ... };
#
state := itt::CONDVAR_IS_SET; # Set the condition variable.
#
back := run waiting_threads # Add to foreground run queue all threads that were waiting for condvar to be set.
where
fun run [] => *back;
#
run ( { do1mailoprun_status=>REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest)
=>
run rest; # Drop completed do1mailoprun.
run ( { do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), finish_do1mailoprun, fate } ! rest)
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
finish_do1mailoprun (); # Do stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks.
# { thread -> itt::MICROTHREAD { thread_id, name, task, ... };
# task -> itt::APPTASK { task_id, ... };
# log::note {. sprintf "%s\tset_condvar__iu moving condvar-blocked task %d:%d (%s) to foreground run queue" (thread_scheduler_statestring()) thread_id task_id name; };
# };
(thread, fate) ! (run rest);
};
end;
end;
};
_ => raise exception DIE "condvar already set";
esac;
stipulate
foreground_run_queue
->
rwq::RW_QUEUE q;
#
fun crash_if_thread_is_already_in_run_queues (itt::MICROTHREAD { thread_id => id, name, ... }, _)
=
if (id >= itt::first_free_thread_id) # Ignore the special threads itt::run_thunk_immediately_thread which can legitimately be present multiple times on run queue.
#
crash_if_thread_is_already_in foreground_run_queue;
crash_if_thread_is_already_in background_run_queue;
fi
where
fun crash_if_dup (itt::MICROTHREAD { thread_id, ... }, _)
=
if (thread_id == id)
#
log::fatal # DOES NOT RETURN!
( sprintf "%s\tAttempted to run thread (%d=='%s') already in run queue!"
(thread_scheduler_statestring())
id
name
);
();
fi;
#
fun crash_if_thread_is_already_in (rwq::RW_QUEUE { back, front })
=
{ apply crash_if_dup *back;
apply crash_if_dup *front;
};
end;
herein
#
fun push_thread_into_foreground_run_queue p
=
{
# crash_if_thread_is_already_in_run_queues p;
q.back := p ! *q.back; # Enqueue a ready thread.
};
#
fun set_didmail_flag_and_push_thread_into_foreground_run_queue (p as (microthread, _)) # Exported (as push_into_run_queue).
=
{ set_didmail_flag microthread;
#
# crash_if_thread_is_already_in_run_queues p;
q.back := p ! *q.back; # Should call this only with thread-switching disabled...?
};
end;
push_into_run_queue = set_didmail_flag_and_push_thread_into_foreground_run_queue; # Exported.
#
fun enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread, old_fate } # Exported. Enqueue (old thread, old_fate), and make new_thread the current thread. Reppy's enqueueAndSwitchCurThread
= # New fate is whatever caller does upon our return.
{
if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue ();
set_didmail_flag_and_push_thread_into_foreground_run_queue
#
(get_current_microthread(), old_fate); # Set mail flag (== high priority) and add to run queue.
#
set_current_microthread new_thread; # Make it the currently executing thread.
};
#
fun promote_some_background_thread_to_foreground () # Promote a thread from the 'cpu-bound' queue to the 'io-bound' queue. Derived from Reppy's promote().
=
case (rwq::take_from_front_of_queue background_run_queue)
#
THE x => {
push_thread_into_foreground_run_queue x;
};
NULL => ();
esac;
#
fun dequeue_thread_preferably_an_io_bound_one () # Try to de-queue a thread from the queue for 'I/O bound' threads: Reppy's dequeue1().
=
case foreground_run_queue
#
rwq::RW_QUEUE { front => REF [], back => REF [] }
=>
dequeue_cpu_bound_thread (); # No I/O-bound threads, settle for a cpu-bound one.
rwq::RW_QUEUE { front as (REF []), back as (REF l) }
=>
{ my (head, rest)
=
case (reverse (l, [])) (head ! rest) => (head, rest); # The operative case.
/* */ _ => raise exception MATCH; # Impossible case because 'l' is known to be non-NIL. We include this case merely to suppress the 'nonexhaustive match' warning.
esac;
front := rest;
back := [];
head;
};
rwq::RW_QUEUE { front as (REF (first ! rest)), ... }
=>
{ front := rest;
#
case first
#
( itt::MICROTHREAD { state as REF itt::state::ALIVE,
task => itt::APPTASK { task_state, alive_threads_count, ... },
done_condvar,
...
},
_ # Thread fate.
)
=> case *task_state
#
itt::state::ALIVE => first; # ALIVE thread in ALIVE task -- go ahead and run it.
deadstate => {
state := deadstate; # Task is dead, so we're dead too. Remember why we're dead.
alive_threads_count := *alive_threads_count - 1; # One less ALIVE thread in this task.
set_condvar__iu done_condvar; # Tell the world the thread is now non-ALIVE.
dequeue_thread_preferably_an_io_bound_one (); # Thread is not ALIVE, so drop it on the floor and try again.
};
esac;
_ => {
dequeue_thread_preferably_an_io_bound_one (); # Thread is not ALIVE, so drop it on the floor and try again.
};
esac; #
}; #
esac
#
also
fun dequeue_cpu_bound_thread () # Remove a thread from the background queue, which holds 'cpu-bound' threads. Reppy's dequeue2().
= # We will be called only if the io-bound thread queue is empty:
case background_run_queue
#
rwq::RW_QUEUE { front => REF [], back => REF [] }
=>
{
(itt::no_runnable_threads_left_thread, *no_runnable_threads_left__hook);
};
rwq::RW_QUEUE { front as REF [], back as REF l }
=>
{ back := [];
front := reverse (l, []);
dequeue_cpu_bound_thread ();
};
rwq::RW_QUEUE { front as REF (first ! rest), ... }
=>
{ front := rest;
#
case first
#
( itt::MICROTHREAD { state as REF itt::state::ALIVE,
task => itt::APPTASK { task_state, alive_threads_count, ... },
done_condvar,
...
},
_ # Thread fate.
)
=> case *task_state
#
itt::state::ALIVE => first; # ALIVE thread in ALIVE task -- go ahead and run it.
deadstate => {
state := deadstate; # Task is dead, so we're dead too. Remember why we're dead.
alive_threads_count := *alive_threads_count - 1; # One less ALIVE thread in this task.
set_condvar__iu done_condvar; # Tell the world the thread is now non-ALIVE.
dequeue_cpu_bound_thread (); # Thread is not ALIVE, so drop it on the floor and try again.
};
esac;
_ => {
dequeue_cpu_bound_thread (); # Thread is not ALIVE, so drop it on the floor and try again.
};
esac;
};
esac;
# fun enqueueSchedulerHook () = let
# myfate
# =
# call_with_current_fate (\\ fate => (
# call_with_current_fate (\\ fate' => switch_to_fate fate fate');
# dispatchSchedulerHook ()))
#
# my rwq::RW_QUEUE { front, ... } = rdyQ1
# in
# front := (dummyTid, myfate) ! *front
# end
#
fun get_uninterruptible_scope_nesting_depth () # A unit-test support hack of no general interest.
=
*uninterruptible_scope_mutex;
#
fun run_next_runnable_thread__xu () # XXX OLD COMMENNT: We do not need to clear uninterruptible mode here because Derived from Reppy's dispatchSchedulerHook.
= # the hook will immediately do a enter_uninterruptible_scope.
switch_to_fate *run_next_runnable_thread__xu__hook (); #
#
fun assert_not_in_uninterruptible_scope which # Exported. Enter a critical section. Derived from Reppy's atomicBegin.
=
{
# if (*log::debugging) log::note_on_stderr {. sprintf "%s\tassert_not_in_uninterruptible_scope %s" (log::debug_statestring()) which; }; fi;
if (*uninterruptible_scope_mutex != 0) # There's actually nothing wrong with nesting uninterruptible scopes, but allowing that can mask errors like failing to end an uninterruptible scope, so during initial checkout I'm making this be an error.
log::note_in_ramlog {. sprintf "assert_not_in_uninterruptible_scope(%s): *uninterruptible_scope_mutex d=%d (0 is expected) -- exiting uncleanly <================" which *uninterruptible_scope_mutex; };
log::note_on_stderr {. sprintf "assert_not_in_uninterruptible_scope(%s): *uninterruptible_scope_mutex d=%d (0 is expected) -- exiting uncleanly <================" which *uninterruptible_scope_mutex; };
log::fatal ( sprintf "assert_not_in_uninterruptible_scope(%s): *uninterruptible_scope_mutex d=%d (0 is expected) -- exiting uncleanly <================" which *uninterruptible_scope_mutex );
();
fi;
};
#
fun enter_uninterruptible_scope () # Exported. Enter a critical section. Derived from Reppy's atomicBegin.
=
uninterruptible_scope_mutex # Record that until further notice, switching microthreads is Not Ok.
:= # Reppy used a boolean here, but Dijkstra-style P/V counters
*uninterruptible_scope_mutex + 1; # nest better -- and in fact would have prevented a bug I found in his code. -- CrT 2012-08-16
#
fun exit_uninterruptible_scope () # Exported. Exit a critical section. Derived from Reppy's atomicEnd.
=
if (not *need_to_switch_threads_when_exiting_uninterruptible_scope
or *uninterruptible_scope_mutex > 1)
#
uninterruptible_scope_mutex := *uninterruptible_scope_mutex - 1;
else
need_to_switch_threads_when_exiting_uninterruptible_scope := FALSE;
call_with_current_fate
(
\\ fate = {
push_thread_into_foreground_run_queue (get_current_microthread(), fate);
#
run_next_runnable_thread__xu ();
}
);
fi;
#
fun dispatch_next_thread__xu__noreturn () # Exported. NEVER RETURNS TO CALLER. # Derived from Reppy's atomicDispatch.
=
# This is the standard exit-to-thread-scheduler call used by packages like
# maildrop or mailqueue when they are in an uninterruptible scope and must
# block the current thread due (e.g.) to being empty: They put the executing
# thread in a wait queue and then call us to exit the uninterruptible scope and
# then find a new thread to run.
{
if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue ();
if (*uninterruptible_scope_mutex != 1)
bar = "===============================================================================================================================================\n";
print "\n";
print bar;
print bar;
print bar;
printf "dispatch_next_thread__xu__noreturn: THREADKIT INTERNAL BUG: *uninterruptible_scope_mutex has bogus value d=%d (should be 1) so shutting down.\n"
*uninterruptible_scope_mutex;
print bar;
print bar;
print bar;
wxp::exit_uncleanly wxp::failure; # A clean exit would try to run more timeslicing-based exit code, which isn't a good idea with timeslicing bollixed this badly.
fi;
if (/* *uninterruptible_scope_mutex == 1 and */ # Above check makes this redundant.
*need_to_switch_threads_when_exiting_uninterruptible_scope)
#
need_to_switch_threads_when_exiting_uninterruptible_scope := FALSE;
run_next_runnable_thread__xu ();
fi;
# The main idea of the next two lines is that when
#
# no_runnable_threads_left__fate
# in
#
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg #
# does mps::block_until_inter_hostthread_request_queue_is_nonempty ();
# mps::dispatch_next_thread__xu__noreturn ();
#
# that here we want to process any inter-hostthread requests before
# running dequeue_thread_preferably_an_io_bound_one, since the inter-hostthread requests
# may queue up threads for us to process in an otherwise thread-starved
# situation. I hope setting thread_scheduler_state to IN_UNINTERRUPTIBLE_SCOPE
# will make this work:
# -- 2012-07-21 CrT
#
# thread_scheduler_state := IN_UNINTERRUPTIBLE_SCOPE;
# service_inter_hostthread_requests (get_any_new_inter_hostthread_requests()); # Locks up in badbricks about 25% of the time with this line in.
(dequeue_thread_preferably_an_io_bound_one ())
->
(thread, fate);
set_current_microthread thread;
#
uninterruptible_scope_mutex := 0;
switch_to_fate fate (); #
};
#
fun dispatch_next_thread__noreturn () # Exported. NEVER RETURNS TO CALLER. Derived from Reppy's dispatch().
= # Called when NOT in a critical section.
{
assert_not_in_uninterruptible_scope "dispatch_next_thread__noreturn";
log::uninterruptible_scope_mutex := 1;
#
dispatch_next_thread__xu__noreturn ();
};
#
fun lognote (msg, thread)
=
{
old_thread = get_current_microthread();
old_thread -> itt::MICROTHREAD { thread_id => old_id, name => old_name, task => old_task, ... };
thread -> itt::MICROTHREAD { thread_id => new_id, name => new_name, task => new_task, ... };
new_task -> itt::APPTASK { task_id => new_task_id, ... };
old_task -> itt::APPTASK { task_id => old_task_id, ... };
if (*log::debugging) log::note {. sprintf "%s\t old=%d:%d(%s) new=%d:%d(%s) %s" (thread_scheduler_statestring()) old_id old_task_id old_name new_id new_task_id new_name msg; }; fi;
};
#
fun switch_to_thread__xu (requested_thread, requested_fate, requested_arg) # Exported. Switch to requested_thread/requested_fate/requested_arg while leaving a critical section. Derived from Reppy's atomicSwitchTo.
= # Called only from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.pkg{
if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue ();
lognote("switch_to_thread__xu/AAA calling nested call_with_current_fate()s",requested_thread);
call_with_current_fate (
#
\\ old_fate
=
{ mode = *uninterruptible_scope_mutex;
#
if (mode == 1 and *need_to_switch_threads_when_exiting_uninterruptible_scope)
#
call_with_current_fate # This is the abnormal path through this fn -- see Note[A] below.
(
\\ requested_fate' # Get an explicit representation of currently running fate.
=
{
lognote("switch_to_thread__xu/BBB calling push_thread_into_foreground_run_queue on BOTH",requested_thread);
old_thread = get_current_microthread();
push_thread_into_foreground_run_queue (requested_thread, requested_fate'); # requested_fate' will immediately do below 'switch_to_fate requested_fate requested_arg;'
push_thread_into_foreground_run_queue (old_thread, old_fate);
need_to_switch_threads_when_exiting_uninterruptible_scope := FALSE;
run_next_runnable_thread__xu (); # "Now for something completely different."
}
);
else
lognote("switch_to_thread__xu/PPP calling enqueue_old_thread_plus_fate",requested_thread);
enqueue_old_thread_plus_old_fate_then_install_new_thread # Put old thread+fate on run queue for later resumption.
{
new_thread => requested_thread,
old_fate
};
uninterruptible_scope_mutex := mode - 1; # Exit uninterruptible scope.
fi;
if (*log::debugging) log::note {. sprintf "%s\tswitch_to_thread__xu/QQQ back from enqueue_old_thread_plus_fate or run_next_runnable_thread__xu, calling switch_to_fate" (thread_scheduler_statestring()); }; fi;
switch_to_fate requested_fate requested_arg; # Switch to requested_thread + requested_fate + requested_arg.
}
);
};
##################################################################################
# Note[A].
# This is the abnormal path through this fn.
#
# While we were executing the uninterruptible
# scope alarm_handler() got a timeslicing signal
# that it is time to switch to the next thread
# in the run queue. Since the uninterruptible scope
# prevented us from doing so, we settled for setting
# need_to_switch_threads_when_exiting_uninterruptible_scope
# to TRUE; now we are exiting the uninterruptible scope
# so it is time to actually switch to next thread.
#
# Note that switching to caller-supplied 'requested_thread'/'requested_fate'
# is not enough; it and current thread might be a tightly coupled pair
# of threads performing a single logical task; we need
# to ensure that all runnable threads get a fair share
# of CPU time, so now it is time for "something completely different".
#
# Consequently at this point we are saving on the run queue
# BOTH the currently running thread+fate AND the caller-requested
# 'requested_thread'+'requested_fate' pair in favor of running
# something unrelated from the run queue (if available).
#
fun yield_to_next_thread__xu fate # Exported. Derived from Reppy's atomicYield.
=
{
# { thread = get_current_microthread();
# thread -> itt::MICROTHREAD { thread_id, name, task, ... };
# task -> itt::APPTASK { task_id, ... };
# log::note {. sprintf "%s\tyield_to_next_thread__xu %d:%d(%s)" (thread_scheduler_statestring()) thread_id task_id name; };
# };
set_didmail_flag_and_push_thread_into_foreground_run_queue (get_current_microthread (), fate);
#
dispatch_next_thread__xu__noreturn ();
};
# Use "itt::run_thunk_thread to
# run given Void -> Void function and then exit.
#
# The thread is placed at the BACK of the BACKGROUND
# run queue for execution when its turn comes up.
#
fun run_thunk f # Exported. Derived from Reppy's enqueueTmpThread.
=
{
# "We should do this, but the overhead
# is too high right now:" -- John H Reppy circa 1992 XXX SUCKO FIXME
#
# myfate = fat::make_isolated_fate f
#
# so instead we do this:
#
(call_with_current_fate
#
(\\ fate # This temporary fate lets us pick up again after generating fate_to_run.
=
{
call_with_current_fate
#
(\\ fate_to_run = switch_to_fate fate fate_to_run); #
#
f () # This is the body of fate_to_run, which we will enter onto run queue.
except
_ = ();
dispatch_next_thread__noreturn (); # This will shut down fate_to_run when done.
}
))
-> fate_to_run;
background_run_queue
->
rwq::RW_QUEUE q;
assert_not_in_uninterruptible_scope "run_thunk";
log::uninterruptible_scope_mutex := 1; # Reppy didn't guard this q op, curiously.
#
q.back := (itt::run_thunk_thread, fate_to_run) ! *q.back; # Enter thread at back of background run queue.
#
log::uninterruptible_scope_mutex := 0;
};
#
fun run_thunks thunks
=
apply run_thunk thunks;
# Use "itt::run_thunk_soon_thread to
# run given Void -> Void function and then exit.
#
# The thread is placed at the BACK of the FOREGROUND run queue.
#
fun run_thunk_soon f # Exported. Derived from Reppy's enqueueTmpThread.
=
{
# "We should do this, but the overhead
# is too high right now:" -- John H Reppy circa 1992 XXX SUCKO FIXME
#
# myfate = fat::make_isolated_fate f
#
# so instead we do this:
#
(call_with_current_fate
#
(\\ fate # This temporary fate lets us pick up again after generating fate_to_run.
=
{
call_with_current_fate
#
(\\ fate_to_run = switch_to_fate fate fate_to_run); #
#
f () # This is the body of fate_to_run, which we will enter onto run queue.
except
_ = ();
dispatch_next_thread__noreturn (); # This will shut down fate_to_run when done.
}
))
-> fate_to_run;
foreground_run_queue
->
rwq::RW_QUEUE q;
assert_not_in_uninterruptible_scope "run_thunk_soon";
log::uninterruptible_scope_mutex := 1; # Reppy didn't guard this q op, curiously.
#
q.back := (itt::run_thunk_soon_thread, fate_to_run) ! *q.back; # Enter thread at back of foreground run queue.
#
log::uninterruptible_scope_mutex := 0;
};
# Same as above, but called from inside an uninterruptible scope,
# so we don't need to do enter_uninterruptible_scope()/exit_uninterruptible_scope().
#
fun run_thunk_soon__iu f # Private to this file.
=
{
# "We should do this, but the overhead
# is too high right now:" -- John H Reppy circa 1992 XXX SUCKO FIXME
#
# myfate = fat::make_isolated_fate f
#
# so instead we do this:
#
(call_with_current_fate
#
(\\ fate # This temporary fate lets us pick up again after generating fate_to_run.
=
{
call_with_current_fate
#
(\\ fate_to_run = switch_to_fate fate fate_to_run); #
#
f () # This is the body of fate_to_run, which we will enter onto run queue.
except
_ = ();
dispatch_next_thread__noreturn (); # This will shut down fate_to_run when done.
}
))
-> fate_to_run;
foreground_run_queue
->
rwq::RW_QUEUE q;
# log::uninterruptible_scope_mutex := 1; # We don't do this because caller has already done it.
#
q.back := (itt::run_thunk_soon_thread, fate_to_run) ! *q.back; # Enter thread at back of foreground run queue.
#
# log::uninterruptible_scope_mutex := 0; # Caller set this, so trust caller to clear it.
};
# Use "itt::run_thunk_immediately_thread to
# run given Void -> Void function and then exit.
#
# At present this is (only) used in:
src/lib/src/lib/thread-kit/src/process-deathwatch.pkg #
# NB: The thread is jumped directly the FRONT of the FOREGROUND run queue,
# instead of the usual procedure of starting at the back
# of the queue and waiting its turn. This is UNFAIR SCHEDULING
# which could lead to THREAD STARVATION if used too often.
#
fun run_thunk_immediately__iu f # Exported. Derived from Reppy's enqueueTmpThread.
=
{
# "We should do this, but the overhead
# is too high right now:" -- John H Reppy circa 1992 XXX SUCKO FIXME
#
# myfate = fat::make_isolated_fate f
#
# so instead we do this:
#
(call_with_current_fate
#
(\\ fate # This temporary fate lets us pick up again after generating fate_to_run.
=
{
call_with_current_fate
#
(\\ fate_to_run = switch_to_fate fate fate_to_run); #
#
f () # This is the body of fate_to_run, which we will enter onto run queue.
except
_ = ();
dispatch_next_thread__noreturn (); # This will shut down fate_to_run when done.
}
))
-> fate_to_run;
foreground_run_queue
->
rwq::RW_QUEUE q;
if (*uninterruptible_scope_mutex == 0)
log::note_in_ramlog {. "run_thunk_immediately__iu: not called from uninterruptible scope!"; };
wxp::exit_uncleanly wxp::failure; # A clean exit would try to run more timeslicing-based exit code, which isn't a good idea with timeslicing bollixed this badly.
fi;
# log::uninterruptible_scope_mutex := 1; # We don't need this call because we're only called from inside an uninterruptible scope anyhow.
#
q.front := (itt::run_thunk_immediately_thread, fate_to_run) ! *q.front; # Jump run_thunk_immediately_thread directly to the head of the foreground run queue.
#
# log::uninterruptible_scope_mutex := 0;
};
default_scheduler_fate
=
fat::make_isolated_fate dispatch_next_thread__xu__noreturn
:
Fate( Void );
cached_approximate_time
=
REF (NULL: Null_Or( tim::Time ));
#
# The point of this variable is
# to make the current system time
# available to threads without
# having them all constantly making
# expensive system calls to find out
# the time.
#
# The idea is to cache the current time
# (when known) and just re-use it through
# the end of the current timeslice.
#
# It is cleared to NULL at the end
# of each timeslice.
#
# When get_approximate_time is called,
# it just returns the value of this
# variable if set, otherwise it makes
# the required system call and caches
# the result in this variable before
# returning it.
# Time is from
src/lib/std/src/time.api # time is from
src/lib/std/time.pkg # time_guts is from
src/lib/std/src/time-guts.pkg # Return an approximation of the
# current time of day. This is at
# least as accurate as the time quantum:
#
fun get_approximate_time () # Exported.
=
case *cached_approximate_time
#
THE time => time;
#
NULL => { time = tim::get_current_time_utc ();
#
cached_approximate_time
:=
THE time;
time;
};
esac;
#
fun preempt__eu current_fate # Preempt the current thread (i.e. 'with fate k).
=
{
assert_not_in_uninterruptible_scope "preempt__eu";
log::uninterruptible_scope_mutex := 1; # Reppy didn't have this, but that looks like a bug, or at least undocumented fragility.
#
current_thread = get_current_microthread ();
#
current_p = (current_thread, current_fate);
if (thread_did_mail current_thread)
#
clear_didmail_flag current_thread;
promote_some_background_thread_to_foreground ();
push_thread_into_foreground_run_queue current_p;
else
rwq::put_on_back_of_queue (background_run_queue, current_p);
fi;
#
# log::uninterruptible_scope_mutex := 0; # We cannot do this here because we must remain in uninterruptible mode until we have
}; # pulled the next job to run out of the run queues. Otherwise, we can wind up pushing
# the current thread on the runq two or more times before we get around to unqueueing a thread.
# (During debugging I have observed the system locking up due to doing this hundreds of times. -- CrT)
# So we depend upon our caller to exit uninterruptible mode.
# This is the function which
# drives timeslicing. It is
# invoked at (typically) 50Hz
# by the posix ALRM signal.
#
# (Note: The runtime does
# not call us when the signal
# is actually received, but
# rather at the next garbage
# collection check point, when
# the state of the heap is
# well defined.)
#
# RETURN VALUE:
# We return the fate ("continuation")
# to be run on completion of handling
# this signal.
# Our 'fate' argument is the code
# that was running when the alarm_handler
# interrupted it, so we can resume the
# interrupted microthread simply by returning
# this fate.
# Alternatively we may switch microthreads
# by returning the fate corresponding to
# some other ready-to-run microthread.
#
fun alarm_handler # Called (only) via root_mythryl_handler_for_interprocess_signals in
src/lib/std/src/nj/interprocess-signals-guts.pkg ( signal: is::Signal, # Int -- signal number, in this case for SIGALRM or SIGUSR1.
count: Int, # Int -- count of times signal has happened since our last call, from c_signal_handler in src/c/machine-dependent/interprocess-signals.c
current_fate: fate::Fate(Void) # Fate ("continuation") which was interrupted to run us.
)
=
{ # Instrumentation:
#
alarm_handler_calls := *alarm_handler_calls + 1;
# disable_debug_ramlogging := (posixlib::getenv "DISABLE_DEBUG_RAMLOGGING" != NULL);
#
if (*uninterruptible_scope_mutex != 0)
alarm_handler_calls_with__uninterruptible_scope_mutex__set := *alarm_handler_calls_with__uninterruptible_scope_mutex__set + 1;
fi;
#
if (*runtime::microthread_switch_lock_refcell__global != 0)
alarm_handler_calls_with__microthread_switch_lock__set := *alarm_handler_calls_with__microthread_switch_lock__set + 1;
fi;
cached_approximate_time := NULL;
#
# We keep cached_approximate_time
# accurate to one time quantum.
#
# Since we just started
# a new timeslice we
# clear it -- we no longer
# know what time it is.
# Floods ram.log:
# log::note_in_ramlog {. sprintf "alarm handler *uninterruptible_scope_mutex d=%d *runtime::microthread_switch_lock_refcell__global d=%d" *uninterruptible_scope_mutex *runtime::microthread_switch_lock_refcell__global; };
if (*uninterruptible_scope_mutex == 0 and *runtime::microthread_switch_lock_refcell__global == 0)
#
# Floods ram.log:
# log::note_in_ramlog {. sprintf "alarm handler calling if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue *uninterruptible_scope_mutex d=%d" *uninterruptible_scope_mutex; };
if_pending_requests_then_add_inter_hostthread_request_handler_thunks_to_run_queue ();
#
preempt__eu current_fate; # Put current thread to sleep so we can give someone else a chance to run.
*run_next_runnable_thread__xu__hook; # Invoke scheduler thread, which will then select next user thread to run.
else
# We're in a critical section
# (in plain English: thread switching
# is disabled), so we cannot preempt
# the currently running thread.
#
# So instead we continue running the
# the current thread after making a note
# to preempt the current thread
# as soon as it exits the critical section
# (which is to say, re-enables thread switching):
#
need_to_switch_threads_when_exiting_uninterruptible_scope := TRUE; # Might already be TRUE; thass ok.
#
current_fate;
fi;
};
kill_count = REF 0;
#
fun kill_handler # Called (only) via root_mythryl_handler_for_interprocess_signals in
src/lib/std/src/nj/interprocess-signals-guts.pkg ( _, # Int -- signal number, in this case for SIGKILL.
_, # Int -- count of times signal has happened since our last call, from c_signal_handler in src/c/machine-dependent/interprocess-signals.c
current_fate # Fate ("continuation") which was interrupted to run us.
)
=
{ kill_count := *kill_count + 1;
#
current_fate;
# fate::switch_to_fate current_fate ();
};
# By default we time-slice at 20ms (50Hz),
# but the user can change this via the
# start_up_thread_scheduler time quantum argument:
#
default_time_quantum
=
tim::from_milliseconds 20;
time_quantum
=
REF default_time_quantum;
stipulate
supporting_hostthreads_are_running = REF FALSE;
#
per_who = "thread-scheduler"; # Log our actions under this name.
herein
#
fun start_supporting_hostthreads_if_not_running () # Private to this file; called only from start_thread_scheduler_timer (below).
=
# For now we start these but never stop them, and piggyback
# it on the regular start_thread_scheduler_timer call to
# avoid changing the externally visible api.
#
# This call is designed to be a no-op
# on second and subsequent calls:
#
if *supporting_hostthreads_are_running
#
();
else
# A quick sanity check: the microthread_switch_lock_refcell logic in
# src/c/hostthread/hostthread-on-posix-threads.c
# depends on our task->hostthread->id being 1, so check that it is to
# avoid really obscure failures if it changes to being something else:
#
{ id = hth::hostthread_to_int (hth::get_hostthread());
if (id != microthread_scheduler_hostthread_id)
printf "fatal error: hostthread id is %d -- need it to be %d\n" id microthread_scheduler_hostthread_id;
wxp::exit_uncleanly_x(1);
fi;
};
# For a first try at least, we'll start
# one fewer cycleserver hostthreads than we
# have cores (to leave one core available
# for foreground processing) and use the
# the same number of io hostthreads (for lack
# of a better idea).
hostthreads_to_start = 8; # See Note[1] -- the '8' was originally max (1, hth::get_cpu_core_count() - 1) but that lost. XXX SUCKO FIXME
cpu::change_number_of_server_hostthreads_to per_who hostthreads_to_start; # Start cpu-server hostthreads.
io::change_number_of_server_hostthreads_to per_who hostthreads_to_start; # Start io-server hostthreads.
# Originally I was starting 'hostthreads_to_start' I/O threads but that can result in
# unintuitive, unexpected and downright incorrect out-of-order execution of requests.
iow::start_server_hostthread_if_not_running per_who;
supporting_hostthreads_are_running := TRUE;
fi;
#
fun stop_supporting_hostthreads_if_running () # Private to this file; called only from start_thread_scheduler_timer (below).
=
# Our charter here is to undo whatever the above
# start_supporting_hostthreads_if_running () did.
#
if *supporting_hostthreads_are_running
#
iow::stop_server_hostthread_if_running { per_who, reply => (\\ _ = ()) }; # Stop the thread that sits in a loop doing C select() calls.
cpu::change_number_of_server_hostthreads_to per_who 0; # Stop cpu-server hostthreads.
io::change_number_of_server_hostthreads_to per_who 0; # Stop io-server hostthreads.
supporting_hostthreads_are_running := FALSE;
else
();
fi;
#
fun startup_phase_11_fn _ # Our arg will be at::STARTUP_PHASE_11_START_SUPPORT_HOSTTHREADS
=
{
open_main_log (); # Try to ensure that log is open before anything that might log starts executing.
fil::set_logger_to (fil::LOG_TO_FILE "mythryl.log");
start_supporting_hostthreads_if_not_running ();
}
where
fun open_main_log ()
=
{
fd = fil::open_for_append "main.log~";
fil::write (fd, "src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg: startup_phase_11_fn\n");
fil::flush fd;
fil::close_output fd;
};
end;
#
fun shutdown_phase_4_fn _ # Our arg will be at::SHUTDOWN_PHASE_4_STOP_SUPPORT_HOSTTHREADS
=
{
stop_supporting_hostthreads_if_running ();
};
#####
my _ = # "my _ =" is needed because only declarations are syntactically legal here.
start_supporting_hostthreads_if_not_running ();
my _ = # Needed because only declarations are syntactically legal here.
at::schedule
(
"thread-scheduler: Clear state vars", # Arbitrary label for debugging displays.
#
[ at::STARTUP_PHASE_1_RESET_STATE_VARIABLES ], # When to run the function.
#
\\ _ = { # Ignored arg is at::STARTUP_PHASE_1_RESET_STATE_VARIABLES
# request_queue := []; # I'm not sure this is needed or even a good idea...
pid := 0;
uninterruptible_scope_mutex := 0;
need_to_switch_threads_when_exiting_uninterruptible_scope := FALSE;
cached_approximate_time := NULL;
supporting_hostthreads_are_running := FALSE;
fil::current_thread_info__hook := NULL; # This gets set (only) in reset_thread_scheduler() (below).
}
);
my _ =
at::schedule ("microthread-preemptive-scheduler.pkg: start support hostthreads", [ at::STARTUP_PHASE_11_START_SUPPORT_HOSTTHREADS ], startup_phase_11_fn); my _ =
at::schedule ("microthread-preemptive-scheduler.pkg: stop support hostthreads", [ at::SHUTDOWN_PHASE_4_STOP_SUPPORT_HOSTTHREADS ], shutdown_phase_4_fn);
#####
end;
#
fun start_thread_scheduler_timer new_time_quantum # Exported. Called from all over.
=
{
scheduler_hostthread := hth::get_hostthread ();
#
start_supporting_hostthreads_if_not_running ();
#
if FALSE
# This is the production code, bypassed for debugging 2012-10-10 CrT:
new_time_quantum
=
tim::(<) (tim::zero_time, new_time_quantum)
##
?? new_time_quantum
:: default_time_quantum;
time_quantum := new_time_quantum;
is::set_signal_handler (is::SIGALRM, is::HANDLER alarm_handler);
is::set_signal_handler (is::SIGUSR1, is::HANDLER alarm_handler); # Added 2014-08-10 CrT to provide a way for other posix-threads to wake us early from an is::pause call.
is::set_signal_handler (is::SIGKILL, is::HANDLER kill_handler);
frq::set_sigalrm_frequency (THE new_time_quantum);
();
else
case (posixlib::getenv "SIGALRM")
#
THE rate => { case (multiword_int::from_string rate)
#
THE i => if (i > 0)
#
new_time_quantum = tim::from_milliseconds i;
time_quantum := new_time_quantum;
is::set_signal_handler (is::SIGALRM, is::HANDLER alarm_handler);
is::set_signal_handler (is::SIGUSR1, is::HANDLER alarm_handler); # Added 2014-08-10 CrT to provide a way for other posix-threads to wake us early from an is::pause call.
is::set_signal_handler (is::SIGKILL, is::HANDLER kill_handler);
frq::set_sigalrm_frequency (THE new_time_quantum);
();
else
# Do not start SIGALM at all:
();
fi;
NULL => { # Same as production code above:
#
new_time_quantum
=
tim::(<) (tim::zero_time, new_time_quantum)
##
?? new_time_quantum
:: default_time_quantum;
time_quantum := new_time_quantum;
is::set_signal_handler (is::SIGALRM, is::HANDLER alarm_handler);
is::set_signal_handler (is::SIGUSR1, is::HANDLER alarm_handler); # Added 2014-08-10 CrT to provide a way for other posix-threads to wake us early from an is::pause call.
is::set_signal_handler (is::SIGKILL, is::HANDLER kill_handler);
frq::set_sigalrm_frequency (THE new_time_quantum);
();
};
esac;
};
NULL => { # Same as production code above:
#
new_time_quantum
=
tim::(<) (tim::zero_time, new_time_quantum)
##
?? new_time_quantum
:: default_time_quantum;
time_quantum := new_time_quantum;
is::set_signal_handler (is::SIGALRM, is::HANDLER alarm_handler);
is::set_signal_handler (is::SIGUSR1, is::HANDLER alarm_handler); # Added 2014-08-10 CrT to provide a way for other posix-threads to wake us early from an is::pause call.
is::set_signal_handler (is::SIGKILL, is::HANDLER kill_handler);
frq::set_sigalrm_frequency (THE new_time_quantum);
();
};
esac;
fi;
();
};
#
fun stop_thread_scheduler_timer () # Exported.
=
{ frq::set_sigalrm_frequency NULL;
#
is::set_signal_handler (is::SIGALRM, is::IGNORE);
is::set_signal_handler (is::SIGUSR1, is::IGNORE); # Added 2014-08-10 CrT to provide a way for other posix-threads to wake us early from an is::pause call.
is::set_signal_handler (is::SIGKILL, is::IGNORE);
();
};
#
fun restart_thread_scheduler_timer () # Exported.
=
start_thread_scheduler_timer *time_quantum;
# Reset various pieces of state
#
fun reset_thread_scheduler running # Exported. This fn is called (only) below at link time
= # and in
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg { # and in
src/lib/src/lib/thread-kit/src/glue/thread-scheduler-control-g.pkg # as part of start-of-world/end-of-world type stuff.
set_current_microthread itt::error_thread; # Initialize current_thread register to a valid value.
# We don't expect this to be used for anything, but we
# like not segfaulting if someone references the register.
no_runnable_threads_left__hook := bogus_void_fate;
thread_scheduler_shutdown_hook := bogus_shutdown_fate;
run_next_runnable_thread__xu__hook := default_scheduler_fate;
cached_approximate_time := NULL;
rwq::clear_queue_to_empty foreground_run_queue;
rwq::clear_queue_to_empty background_run_queue;
fil::current_thread_info__hook # Give the logger access to current thread id.
:= # We cannot set this non-NULL until set_current_microthread call above -- we'll segfault if we do.
if (not running)
#
NULL;
else
THE (\\ () = { (get_current_microthread()) -> thread;
#
thread -> itt::MICROTHREAD { thread_id, name, task, ... };
task -> itt::APPTASK { task_id, ... };
#
(thread_id, name, task_id);
}
);
fi;
if (not running)
#
push_into_run_queue (itt::error_thread, itt::error_fate);
fi;
};
my _ = # "my _ =" because only declarations are syntactically legal here.
reset_thread_scheduler FALSE;
##################################################
# This section contains code for
# inter-hostthread communication per
#
#
src/lib/std/src/hostthread/template-hostthread.pkg #
fun is_running ()
=
(*pid != 0 and *pid == wxp::get_process_id ()); # This way, if the heap gets dumped to disk and then and reloaded, is_running() will
# (correctly) return FALSE, even though pid may not have gotten zeroed.
mutex = hth::make_mutex ();
condvar = hth::make_condvar ();
my _ = printf "mutex d=%d -- microthread_preemptive_scheduler" (hth::mutex_to_int mutex);
my _ = log::note {. sprintf "mutex d=%d -- microthread_preemptive_scheduler" (hth::mutex_to_int mutex); };
my _ = log::note_in_ramlog {. sprintf "mutex d=%d -- microthread_preemptive_scheduler" (hth::mutex_to_int mutex); };
#
fun do_thunk (thunk: Void -> Void) # Internal fn -- will execute in context of server hostthread.
=
thunk ();
#
fun do (thunk: Void -> Void) # Exported -- will execute in context of client hostthread.
=
{
log::note_in_ramlog {. sprintf "mps::do/TOP: uninterruptible_scope_mutexxx=%d acquiring mutex -- microthread-preemptive-scheduler.pkg" *uninterruptible_scope_mutex; };
log::note_in_ramlog {. sprintf "mps::do/AAA: uninterruptible_scope_mutexxx=%d acquiring mutex -- microthread-preemptive-scheduler.pkg" *uninterruptible_scope_mutex; };
hth::acquire_mutex mutex;
#
request_queue := (DO_THUNK thunk) ! *request_queue;
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
log::note_in_ramlog {. sprintf "mps::do/DDD: uninterruptible_scope_mutexxx=%d broadcasting request_queue len %d-- microthread-preemptive-scheduler.pkg" *uninterruptible_scope_mutex (list::length *request_queue); };
log::note_in_ramlog {. sprintf "mps::do/ZZZ: uninterruptible_scope_mutexxx=%d done -- microthread-preemptive-scheduler.pkg" *uninterruptible_scope_mutex; };
wake_scheduler_hostthread_if_paused (); # Otherwise if scheduler hostthread is is::paused()d for lack of anything to do it may not respond until the next 50HZ SIGALRM -- wasting up to 20ms, AKA twenty million nanoseconds.
};
#
fun do_echo (r: Do_Echo) # Internal fn -- will execute in context of server hostthread.
=
r.reply r.what;
#
fun echo (request: Do_Echo) # Exported -- will execute in context of client hostthread.
=
{
# See below Voice Of Experience comment.
hth::acquire_mutex mutex;
#
request_queue := (DO_ECHO request) ! *request_queue;
#
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
};
#
fun add_inter_hostthread_request_handler_thunks_to_run_queue' ()
=
if (*uninterruptible_scope_mutex == 0)
#
log::uninterruptible_scope_mutex := 1; # With the current implementation, there is no way *uninterruptible_scope_mutex can become nonzero between our test and call here,
# # because we run all microthreads in a single hostthread which switches microthreads only at the start of a function call.
queue_inter_hostthread_requests__iu (get_any_new_inter_hostthread_requests__iu ()); # Originally we wrapped this in run_thunk_soon__iu {. ... } but that opens the door to overlapping execution and possible loss of request ordering, which could be bad.
#
log::uninterruptible_scope_mutex := 0;
else
queue_inter_hostthread_requests__iu (get_any_new_inter_hostthread_requests__iu ());
fi
where
#
fun get_any_new_inter_hostthread_requests__iu ()
=
{
# Voice Of Experience:
# We need to be very careful not to allow the thread-scheduler
# to switch to another microthread while this microthread is
# holding 'mutex', otherwise that other microthread could easily
# deadlock attempting to also acquire 'mutex'.
#
# The thread scheduler gets called by the heapcleaner, which is
# invoked only at the start of a function, so at present we just
# avoid calling any functions while holding 'mutex'.
# -- 2012-11-04 CrT.
hth::acquire_mutex mutex;
#
new_requests = *request_queue;
#
request_queue := [];
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
result = case new_requests # Reverse new_requests to restore original request ordering.
# # In practice we almost always have one of the first two cases here.
[] => []; # It is ok if new_requests == [].
[x] => [x]; # Save time by not calling list::reverse.
_ => list::reverse new_requests; # Can't avoid doing a real list::reverse.
esac;
result;
};
#
fun queue_inter_hostthread_requests__iu (request ! rest)
=>
{
queue_inter_hostthread_request__iu request;
#
queue_inter_hostthread_requests__iu rest;
}
where
fun queue_inter_hostthread_request__iu (DO_ECHO r) => run_thunk_soon__iu {. do_echo r; };
queue_inter_hostthread_request__iu (DO_THUNK t) => run_thunk_soon__iu {. do_thunk t; };
end;
end;
# queue_inter_hostthread_requests__iu [] => ();
queue_inter_hostthread_requests__iu [] => {
();
};
end;
end;
my _ =
add_inter_hostthread_request_handler_thunks_to_run_queue__hook # A little hack that lets us call add_inter_hostthread_request_handler_thunks_to_run_queue' ()
:= # from most anywhere in the file without the whole file collapsing
(THE add_inter_hostthread_request_handler_thunks_to_run_queue'); # into mutual recursion.
#
fun block_until_inter_hostthread_request_queue_is_nonempty () # Exported. This gives no_runnable_threads_left__fate in src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg.
= # a graceful way to block until we have something to do.
{
# Unlike the above functions, we do not need to
# worry about calling a function while holding
# the mutex, because we are only called when
# there are no other ready-to-run microthreads.
#
hth::acquire_mutex mutex;
#
for (inter_hostthread_request_queue_is_empty()) {
#
hth::wait_on_condvar (condvar, mutex);
};
#
hth::release_mutex mutex;
hth::broadcast_condvar condvar;
};
# REMAINING WORK:
# I put
#
# queue_inter_hostthread_requests (get_any_new_inter_hostthread_requests());
#
# in alarm_handler() because that is the only place in this file
# which can guarantee a response in a bounded amount of time,
# but we probably also want to opportunistically do the same in
# some of our other thread-switch functions.
#
# When both run queues are empty, we should block by calling
# block_until_inter_hostthread_request_queue_is_nonempty()
# At present this is done in
# no_runnable_threads_left__fate
# in
src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg # -- there may be other places we should be doing this... ?
# -- 2012-07-21 CrT
#
# End of hostthread-support section
##################################################
};
end;
################################################################################
# Note[1] 2015-05-02 CrT hostthreads_to_start = 8 issue -- XXX SUCKO FIXME
#
# Originally I had
# hostthreads_to_start = max (1, hth::get_cpu_core_count() - 1);
# inspired by the Linux kernel maintainers dictum that one should have
# no more hostthreads than cores.
#
# But the xclient-unit-test.pkg failed on a two-core 2014 Intel Atom
# box (which turned out to compile Mythryl almost twice as fast as my
# six-core 2010 AMD Phenom(tm) II X6 1090T box despite matching
# 3.2GHz clocks -- the Atom had six times the cache size!) because
# it resulted in only one I/O hostthread being started up, whereas
# at least two are needed to service the X socket, one to block
# waiting for input and one to send output. With only one I/O pthread
# xclient-unit-test.pkg deadlocks silently because with the sole I/O
# pthread blocked waiting for X server input which never comes, the
# I/O calls to send commands to the X server queue up and never get
# serviced.
#
# I then tried hardwiring hostthreads_to_start at 16, which fell afoul of
# src/c/mythryl-config.h:#define MAX_HOSTTHREADS 32
# src/c/main/runtime-state.c:Hostthread* hostthread_table__global[ MAX_HOSTTHREADS ];
# hardwiring the total available hostthreads to 32, resulting in table
# exhaustion.
#
# So for the moment I'm hardwiring hostthreads_to_start at 8.
#
# In the long run we need to
#
# 1) Make hostthread_table__global[] dynamically expandable.
#
# 2) Dynamically increase the number of I/O threads in proportion to
# the number of open sockets, since in general we'll need one
# sacrificial I/O thread blocked waiting to read for each socket.
#
# (Obviously, in special cases with a high socket count we might want
# to use a separate select()-based mechanism to read them all with a
# single I/O hostthread. But various versions of the fundamental
# problem above will presumably still remain, which will still require
# dynamic scaling of I/O hostthread count.)
#
# Pending an actual fix, it would be nice to at least track the number
# of open sockets and issue a warning or error message if it exceeds
# 6 or so.
#
# For the moment, however, few Mythryl processes are likely to have
# more than 2-3 sockets open, so this isn't a super pressing issue.