## microthread.pkg
#
# This is the main thread package.
#
# (It used to be called just "thread.pkg", but grepping for
# 'thread' yielded too many false positives. ;-)
# Compiled by:
#
src/lib/std/standard.lib# See also higher-level functionality implemented in:
#
src/lib/src/lib/thread-kit/src/core-thread-kit/task-junk.pkgstipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mop = mailop; # mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package tsr = thread_scheduler_is_running; # thread_scheduler_is_running is from
src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler-is-running.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package microthread: (weak)
api {
include api Microthread; # Microthread is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.api default_exception_handler: Ref (Exception -> Void);
reset_thread_package: { running: Bool } -> Void;
}
{
exception THREAD_SCHEDULER_NOT_RUNNING;
#
Microthread == itt::Microthread;
Apptask == itt::Apptask;
Condition_Variable == itt::Condition_Variable;
Condvar_State == itt::Condvar_State;
Mailop(X) = itt::Mailop(X);
default_microthread = itt::default_thread;
package state {
#
State = ALIVE
| SUCCESS
| FAILURE
| FAILURE_DUE_TO_UNCAUGHT_EXCEPTION
# No Exception value here. This is the crucial difference from itt::state::State,
; # which makes this State an equality type -- much more convenient for client code.
};
fun internal_state_to_external_state (itt::state::ALIVE ) => state::ALIVE;
internal_state_to_external_state (itt::state::SUCCESS ) => state::SUCCESS;
internal_state_to_external_state (itt::state::FAILURE ) => state::FAILURE;
internal_state_to_external_state (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION _) => state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION;
end;
stipulate # NB: itt::default_task is task_id #0, and is treated specially by the thread scheduler.
# # itt::default_thread is thread_id #0, but is NOT treated specially by the thread scheduler.
next_task_id = REF 1; # We could combine these two counters, but why be picayune?
next_thread_id = REF 3; # We start at 3 because itt::thread_scheduler_temporary_thread is #1 and itt::error_thread is #2.
fun make_condition_variable ()
=
CONDITION_VARIABLE (REF (CONDVAR_IS_NOT_SET []));
herein
fun reset_thread_package { running }
=
{ next_task_id := 1;
next_thread_id := itt::first_free_thread_id;
#
mps::reset_thread_scheduler running;
};
fun exception_handler (x: Exception)
=
();
default_exception_handler
=
REF exception_handler;
fun make_apptask task_name
=
{ (*next_task_id) -> task_id;
next_task_id := task_id+1; # This sequence is (only) safe because we are only called when thread-switching is disabled.
APPTASK
{ task_name,
task_id,
task_state => REF itt::state::ALIVE,
alive_threads_count => REF 0,
task_condvar => make_condition_variable (),
cleanup_task => REF NULL
};
};
default_task = make_apptask "default task";
fun make_microthread__iu name task
=
{ (*next_thread_id) -> thread_id;
next_thread_id := thread_id+1; # This sequence is (only) safe because we are only called when thread-switching is disabled.
MICROTHREAD
{ name,
thread_id,
task,
#
didmail => REF FALSE,
state => REF itt::state::ALIVE,
#
exception_handler => REF *default_exception_handler,
properties => REF [],
done_condvar => make_condition_variable ()
};
};
end;
fun get_task's_id (APPTASK { task_id, ... } )
=
task_id;
fun get_task's_name (APPTASK { task_name, ... } )
=
task_name;
fun get_task's_state (APPTASK { task_state, ... } )
=
internal_state_to_external_state *task_state;
fun get_task's_alive_threads_count (APPTASK { alive_threads_count, ... } )
=
*alive_threads_count;
fun same_task ( APPTASK { task_id => id1, ... },
APPTASK { task_id => id2, ... }
)
=
id1 == id2;
fun compare_task ( APPTASK { task_id => id1, ... },
APPTASK { task_id => id2, ... }
)
=
int::compare (id1, id2);
fun kill_task { success, task => APPTASK { task_id, task_state, task_condvar, ... } }
=
if (task_id > 0) # Default task cannot be killed.
#
mps::assert_not_in_uninterruptible_scope "kill_task";
log::uninterruptible_scope_mutex := 1;
#
case *task_state
#
itt::state::ALIVE
=>
{ task_state := (success ?? itt::state::SUCCESS # Set it to non-ALIVE.
:: itt::state::FAILURE);
#
# # We do NOT zero alive_threads_count here; we let it decrement naturally
# # as the various threads in the task discover they are dead.
#
mop::set_condvar__iu task_condvar; # Tell the world it is now non-ALIVE.
# NB: We are very careful to set done_condvar EXACTLY once, immediately AFTER exiting ALIVE state.
};
_ => ();
esac;
#
log::uninterruptible_scope_mutex := 0;
fi;
fun same_thread ( MICROTHREAD { thread_id => id1, ... },
MICROTHREAD { thread_id => id2, ... }
)
=
id1 == id2;
fun compare_thread ( MICROTHREAD { thread_id => id1, ... },
MICROTHREAD { thread_id => id2, ... }
)
=
int::compare (id1, id2);
fun hash_thread (MICROTHREAD { thread_id, ... } )
=
unt::from_int thread_id;
fun get_thread's_name (MICROTHREAD { name, ... } )
=
name;
fun get_thread's_id (MICROTHREAD { thread_id, ... } )
=
thread_id;
get_thread's_id_as_string
=
itt::get_thread's_id_as_string;
fun get_thread's_state (MICROTHREAD { state, ... } )
=
internal_state_to_external_state *state;
fun get_thread's_task (MICROTHREAD { task, ... } )
=
task;
fun kill_thread { success, thread => (MICROTHREAD { done_condvar, state, task, ... }) } # If given thread is alive, kill it. Written 2012-08-11 CrT -- I hope this works. :-)
=
{ task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
#
mps::assert_not_in_uninterruptible_scope "kill_thread";
log::uninterruptible_scope_mutex := 1;
#
case *state
#
itt::state::ALIVE
=>
{ state := (success ?? itt::state::SUCCESS :: itt::state::FAILURE); # Set it to non-ALIVE.
#
mop::set_condvar__iu done_condvar; # Tell the world the thread is now non-ALIVE.
# NB: We are very careful to set done_condvar EXACTLY once, immediately AFTER exiting ALIVE state.
alive_threads_count := *alive_threads_count - 1; # One less live thread in this task.
if (not success)
#
case *task_state
#
itt::state::ALIVE
=>
if (task_id > 0) # Default task never exits ALIVE state.
#
task_state := itt::state::FAILURE; # If a thread ends in FAILURE, its task ends in FAILURE also.
#
mop::set_condvar__iu task_condvar; # Tell the world the task is now non-ALIVE.
fi;
_ => ();
esac;
fi;
case *task_state
#
itt::state::ALIVE
=>
if (task_id > 0) # Default task never exits ALIVE state.
#
task_state := itt::state::SUCCESS; # All threads in task ended in SUCCESS, so task ends in SUCCESS.
#
mop::set_condvar__iu task_condvar; # Tell the world the task is now non-ALIVE.
fi;
_ => ();
esac;
};
_ => (); # Killing a dead thread is a no-op.
esac;
#
log::uninterruptible_scope_mutex := 0;
};
fun mark_thread_as_done_and_dispatch_next_thread (MICROTHREAD { state, done_condvar, task, ... }, final_state)
=
{ task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
#
mps::assert_not_in_uninterruptible_scope "mark_thread_as_done_and_dispatch_next_thread";
log::uninterruptible_scope_mutex := 1;
#
case *state
#
itt::state::ALIVE
=>
{ state := final_state; # Set it to non-ALIVE.
#
mop::set_condvar__iu done_condvar; # Tell the world it is now non-ALIVE.
alive_threads_count := *alive_threads_count - 1; # One less live thread in this task.
case final_state
#
itt::state::FAILURE
=>
if (task_id > 0) # Default task never dies.
#
task_state := itt::state::FAILURE; # Task fails if any of its threads fail.
#
mop::set_condvar__iu task_condvar; # Tell the world the task is now non-ALIVE.
fi;
itt::state::SUCCESS
=>
case *alive_threads_count
#
0 => case *task_state
#
itt::state::ALIVE
=>
{ task_state := itt::state::SUCCESS; # Task succeeds if all of its threads succeeded.
#
mop::set_condvar__iu task_condvar; # Tell the world the task is now non-ALIVE.
};
_ => ();
esac;
_ => ();
esac;
_ => { printf "Impossible case -- microthread.pkg\n";
winix__premicrothread::process::exit(1);
};
esac;
};
_ => ();
esac;
#
mps::dispatch_next_thread__xu__noreturn ();
};
fun do_handler (MICROTHREAD { exception_handler, state, done_condvar, task, ... }, exn)
=
{ task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
#
mps::assert_not_in_uninterruptible_scope "do_handler (microthread.pkg)";
log::uninterruptible_scope_mutex := 1;
#
case *state
#
itt::state::ALIVE
=>
{ state := itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION exn; # Set it to non-ALIVE.
#
mop::set_condvar__iu done_condvar; # Tell the world the thread is now non-ALIVE.
alive_threads_count := *alive_threads_count - 1; # One less live thread in this task.
if (task_id > 0) # Default task never dies.
#
case *task_state
#
itt::state::ALIVE
=>
{ task_state := itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION exn; # Thread death-by-uncaught-exception *always* kills the task.
#
mop::set_condvar__iu task_condvar; # Tell the world the task is now non-ALIVE.
};
_ => ();
esac;
fi;
};
_ => ();
esac;
#
log::uninterruptible_scope_mutex := 0;
*exception_handler exn
except
_ = ();
};
Make_Thread_Args = THREAD_NAME String # Future-proofing: we can add additional options (priority?) here in future without breaking existing code.
| THREAD_TASK Apptask
# New thread will be a member of this task (instead of defaulting to same task as parent thread).
;
fun run_thread__xu new_thread f x
=
fate::call_with_current_fate
(\\ old_fate
=
{
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread, old_fate };
log::uninterruptible_scope_mutex := 0;
{ f x;
#
}
except
ex = do_handler (new_thread, ex);
mark_thread_as_done_and_dispatch_next_thread (new_thread, itt::state::SUCCESS);
}
);
fun make_thread' args f x
=
{
(mps::get_current_microthread ())
->
MICROTHREAD { task, ... };
makethread { args, name => "", task }; # Default to creating new thread in current task with empty name.
}
where
fun makethread { args => (THREAD_NAME n ! rest), name, task }
=>
makethread { args => rest, name => n, task }; # Note caller-supplied non-default name for new thread.
makethread { args => (THREAD_TASK t ! rest), name, task }
=>
makethread { args => rest, name, task => t }; # Note caller-supplied non-default task for new thread.
makethread { args => [], name, task }
=>
{
task -> APPTASK { alive_threads_count, ... };
mps::assert_not_in_uninterruptible_scope "make_thread";
log::uninterruptible_scope_mutex := 1;
#
alive_threads_count := *alive_threads_count + 1; # Task now has one more live thread.
new_thread = make_microthread__iu name task;
# (mps::get_current_microthread ()) -> MICROTHREAD { name => thread_name, thread_id, task => old_task, ... };
# old_task -> APPTASK { task_name => old_task_name, task_id => old_task_id, ... };
# task -> APPTASK { task_name, task_id, ... };
# new_thread -> MICROTHREAD { name => new_thread_name, thread_id => new_thread_id, ... };
# log::note {. sprintf "make_thread' creating thread %d:%d(%s)" new_thread_id task_id new_thread_name; };
# file::print "make_thread' "; mps::print_thread_scheduler_state (); file::print "\n";
# file::print "make_thread' called by '"; file::print thread_name; file::print "'=="; mps::print_int 2 thread_id; file::print " (in task '"; file::print old_task_name; file::print "'="; mps::print_int 2 old_task_id; file::print ") ";
# file::print "CREATING THREAD '"; file::print new_thread_name; file::print "'$="; mps::print_int 2 new_thread_id; file::print " (in task '"; file::print task_name; file::print "'="; mps::print_int 2 task_id; file::print ")\n";
#
run_thread__xu new_thread f x;
new_thread; # Returns our result when (old_thread, old_fate) next gets scheduled to run.
};
end;
end;
# Eventually, above should use make_isolated_fate, but that is too slow at present.
# The eventual code would look like:
#
# fun make_thread' args f x
# =
# id
# where
# log::uninterruptible_scope_mutex := 1;
# id = make_microthread__iu name itt::default_task;
# fun thread ()
# =
# ( (f x)
# except
# ex = do_handler (id, ex);
#
# mark_thread_as_done_and_dispatch_next_thread (id, itt::state::SUCCESS);
# );
#
# fate::call_with_current_fate
# (\\ parent_fate
# =
# ( mps::run_thread (id, parent_fate);
# log::uninterruptible_scope_mutex := 0;
# fate::throw (fate::make_isolated_fate thread) ();
# )
# );
# end;
#
fun make_thread name f # Convenience fn for the most common case.
=
make_thread' [ THREAD_NAME name ] f ();
fun make_task name threads
=
{ task = make_apptask name; # Make the task record which will be shared by our new threads.
#
fun make_and_queue_thread__iu (threadname: String, threadbody: Void -> Void)
=
{ microthread = make_microthread__iu threadname task; # Make the Microthread record for the new thread.
#
threadfate # Make the fate for the new thread.
=
call_with_current_fate
(\\ fate
=
{ call_with_current_fate (\\ fate' = switch_to_fate fate fate');
#
threadbody () # This block of code becomes 'threadfate'.
except #
ex = do_handler (microthread, ex); #
#
mark_thread_as_done_and_dispatch_next_thread #
(microthread, itt::state::SUCCESS); #
}
);
mps::push_into_run_queue (microthread, threadfate); # Schedule the new thread to run.
};
task -> APPTASK { alive_threads_count, ... };
mps::assert_not_in_uninterruptible_scope "make_task";
log::uninterruptible_scope_mutex := 1; # We want all threads in task queued before any have a chance to run, to avoid weird
# # race conditions where one throws an exception before others get created (say).
apply make_and_queue_thread__iu threads;
#
alive_threads_count := list::length threads;
#
log::uninterruptible_scope_mutex := 0;
task;
};
fun thread_done__mailop (MICROTHREAD { done_condvar, ... } )
=
mop::wait_on_condvar' done_condvar; # This and the next are the only uses of wait_on_condvar'
fun task_done__mailop ( APPTASK { task_condvar, ... } )
=
mop::wait_on_condvar' task_condvar; # This and the prev are the only uses of wait_on_condvar'
fun get_current_microthread ()
=
if (tsr::thread_scheduler_is_running()) mps::get_current_microthread ();
else raise exception THREAD_SCHEDULER_NOT_RUNNING;
fi;
fun get_current_microthread's_name ()
=
get_thread's_name (get_current_microthread ())
except
THREAD_SCHEDULER_NOT_RUNNING
=
"[no thread]";
#
# When microthread_preemptive_scheduler is not running
# get_thread() returns garbage XXX BUGGO FIXME
# and using that result will SEGV us.
#
# We return a dummy value here (rather
# than letting the exception propagate)
# for the convenience of logger.pkg
# logging.
my _ =
log::get_current_microthread's_name__hook := get_current_microthread's_name;
fun get_current_microthread's_id ()
=
{
(get_current_microthread ())
->
MICROTHREAD { thread_id, ... };
thread_id;
}
except
THREAD_SCHEDULER_NOT_RUNNING
=
0;
#
# See comments to get_current_microthread's_name (), above.
fun thread_exit { success }
=
{ (get_current_microthread ())
->
# (tid as MICROTHREAD { properties, ... } );
(tid as MICROTHREAD { properties, name, ... } );
# file::print "\n\n====================\nthread_exit("; file::print name; file::print "...\n";
# (mps::get_current_microthread ()) -> MICROTHREAD { name => thread_name, thread_id, task, ... };
# task -> APPTASK { task_name, task_id, ... };
# file::print "thread_exit "; mps::print_thread_scheduler_state (); file::print "\n";
# file::print "thread_exit for "; file::print thread_name; file::print "'="; mps::print_int 2 thread_id; file::print " (in task '"; file::print task_name; file::print "'="; mps::print_int 2 task_id; file::print ") "; file::print ")\n";
# file::print "\n====================\n\n";
mark_thread_as_done_and_dispatch_next_thread
( tid,
success ?? itt::state::SUCCESS
:: itt::state::FAILURE
);
};
fun yield ()
=
fate::call_with_current_fate
(\\ fate
=
{
mps::assert_not_in_uninterruptible_scope "yield (microthread.pkg)";
log::uninterruptible_scope_mutex := 1;
mps::yield_to_next_thread__xu fate;
}
);
fun get_exception_that_killed_thread (MICROTHREAD { state => REF (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION x), ... } ) => THE x;
get_exception_that_killed_thread _ => NULL;
end;
fun get_exception_that_killed_task (APPTASK { task_state => REF (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION x), ... } ) => THE x;
get_exception_that_killed_task _ => NULL;
end;
# Thread-local data
#
stipulate
fun make_property ()
=
{ exception EXCEPTION X;
fun cons (a, l)
=
EXCEPTION a ! l;
fun peek [] => NULL;
peek (EXCEPTION a ! _) => THE a;
peek (_ ! l) => peek l;
end;
fun delete [] => [];
delete (EXCEPTION a ! r) => r;
delete (x ! r) => x ! delete r;
end;
{ cons, peek, delete };
};
fun make_bool ()
=
{ exception EXCEPTION;
fun peek [] => FALSE;
peek (EXCEPTION ! _) => TRUE;
peek (_ ! l) => peek l;
end;
fun set (l, flag)
=
set (l, [])
where
fun set ([], _) => if flag EXCEPTION ! l; else l;fi;
set (EXCEPTION ! r, xs) => if flag l; else list::reverse_and_prepend (xs, r);fi;
set (x ! r, xs) => set (r, x ! xs);
end;
end;
{ set, peek };
};
fun get_properties ()
=
{ (get_current_microthread ())
->
MICROTHREAD { properties, ... };
properties;
};
herein
fun make_per_thread_property (init: Void -> Y)
=
{ my { peek, cons, delete }
=
make_property ();
fun peek_fn ()
=
peek (*(get_properties ()));
fun get_f ()
=
{ h = get_properties ();
case (peek *h)
THE b => b;
NULL => { b = init ();
h := cons (b, *h);
b;
};
esac;
};
fun clr_f ()
=
{ h = get_properties ();
h := delete *h;
};
fun set_fn x
=
{ h = get_properties ();
h := cons (x, delete *h);
};
{ peek => peek_fn,
get => get_f,
clear => clr_f,
set => set_fn
};
};
fun make_boolean_per_thread_property ()
=
{ my { peek, set }
=
make_bool ();
fun get_f ()
=
peek(*(get_properties ()));
fun set_f flag
=
{ h = get_properties ();
h := set (*h, flag);
};
{ get => get_f,
set => set_f
};
};
end; # stipulate
};
end;