PreviousUpNext

15.4.927  src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg

## microthread.pkg
#
# This is the main thread package.
#
# (It used to be called just "thread.pkg", but grepping for
# 'thread' yielded too many false positives. ;-)

# Compiled by:
#     src/lib/std/standard.lib

# See also higher-level functionality implemented in:
#     src/lib/src/lib/thread-kit/src/core-thread-kit/task-junk.pkg

stipulate
    package fat =  fate;                                                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mop =  mailop;                                                              # mailop                                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
    package tsr =  thread_scheduler_is_running;                                         # thread_scheduler_is_running           is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler-is-running.pkg
    package mps =  microthread_preemptive_scheduler;                                    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package microthread: (weak)
                            api {
                                include api Microthread;                                # Microthread                           is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.api
                                default_exception_handler:  Ref (Exception -> Void);
                                reset_thread_package: { running: Bool } -> Void;
                            }
    {
        exception THREAD_SCHEDULER_NOT_RUNNING;
        #

        Microthread         ==  itt::Microthread;
        Apptask             ==  itt::Apptask;
        Condition_Variable  ==  itt::Condition_Variable;
        Condvar_State       ==  itt::Condvar_State;

        Mailop(X) = itt::Mailop(X);

        default_microthread = itt::default_thread;

        package state {
            #
            State = ALIVE
                  | SUCCESS
                  | FAILURE
                  | FAILURE_DUE_TO_UNCAUGHT_EXCEPTION                                   # No Exception value here.  This is the crucial difference from itt::state::State,
                  ;                                                                     # which makes this State an equality type -- much more convenient for client code.
        };

        fun internal_state_to_external_state  (itt::state::ALIVE                              ) =>  state::ALIVE;
            internal_state_to_external_state  (itt::state::SUCCESS                            ) =>  state::SUCCESS;
            internal_state_to_external_state  (itt::state::FAILURE                            ) =>  state::FAILURE;
            internal_state_to_external_state  (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION _) =>  state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION;
        end;

        stipulate                                                                       # NB: itt::default_task   is   task_id #0, and is     treated specially by the thread scheduler.
            #                                                                           #     itt::default_thread is thread_id #0, but is NOT treated specially by the thread scheduler.
            next_task_id   = REF 1;                                                     # We could combine these two counters, but why be picayune?
            next_thread_id = REF 3;                                                     # We start at 3 because itt::thread_scheduler_temporary_thread is #1 and itt::error_thread is #2.

            fun make_condition_variable ()
                =
                CONDITION_VARIABLE (REF (CONDVAR_IS_NOT_SET []));
        herein

            fun reset_thread_package  { running }
                =
                {   next_task_id   :=  1;
                    next_thread_id :=  itt::first_free_thread_id;
                    #
                    mps::reset_thread_scheduler  running;
                };

            fun exception_handler (x:  Exception)
                =
                ();

            default_exception_handler
                =
                REF exception_handler;

            fun make_apptask  task_name
                =
                { (*next_task_id) ->  task_id;
                    next_task_id  :=  task_id+1;                                        # This sequence is (only) safe because we are only called when thread-switching is disabled.

                    APPTASK
                      { task_name,
                        task_id,
                        task_state          =>  REF itt::state::ALIVE,
                        alive_threads_count =>  REF 0,
                        task_condvar        =>  make_condition_variable (),
                        cleanup_task        =>  REF NULL        
                      };
                };

            default_task = make_apptask  "default task";

            fun make_microthread__iu  name  task
                =
                { (*next_thread_id) -> thread_id;
                    next_thread_id  := thread_id+1;                                     # This sequence is (only) safe because we are only called when thread-switching is disabled.

                    MICROTHREAD
                      { name,
                        thread_id,
                        task,
                        #
                        didmail           =>  REF FALSE,
                        state             =>  REF itt::state::ALIVE,
                        #
                        exception_handler =>  REF *default_exception_handler,
                        properties        =>  REF [],
                        done_condvar      =>  make_condition_variable ()
                      };
                };

        end;

        fun get_task's_id (APPTASK { task_id, ... } )
            =
            task_id;

        fun get_task's_name (APPTASK { task_name, ... } )
            =
            task_name;

        fun get_task's_state (APPTASK { task_state, ... } )
            =
            internal_state_to_external_state  *task_state;

        fun get_task's_alive_threads_count (APPTASK { alive_threads_count, ... } )
            =
            *alive_threads_count;               

        fun same_task ( APPTASK { task_id => id1, ... },
                        APPTASK { task_id => id2, ... }
                      )
            =
            id1 == id2;

        fun compare_task ( APPTASK { task_id => id1, ... },
                           APPTASK { task_id => id2, ... }
                         )
            =
            int::compare (id1, id2);

        fun kill_task { success, task => APPTASK { task_id, task_state, task_condvar, ... } }
            =
            if (task_id > 0)                                                                            # Default task cannot be killed.
                #
                                                                                                        mps::assert_not_in_uninterruptible_scope "kill_task";
                log::uninterruptible_scope_mutex := 1;
                    #
                    case *task_state
                        #
                        itt::state::ALIVE
                            =>
                            {   task_state :=  (success  ??  itt::state::SUCCESS                        # Set it to non-ALIVE.
                                                         ::  itt::state::FAILURE);
                                #
                                #                                                                       # We do NOT zero alive_threads_count here;  we let it decrement naturally
                                #                                                                       # as the various threads in the task discover they are dead.  
                                #
                                mop::set_condvar__iu  task_condvar;                                     # Tell the world it is now non-ALIVE.
                                                                                                        # NB: We are very careful to set done_condvar EXACTLY once, immediately AFTER exiting ALIVE state.
                            };

                        _    => ();
                    esac;
                    #
                log::uninterruptible_scope_mutex := 0;
            fi;





        fun same_thread ( MICROTHREAD { thread_id => id1, ... },
                          MICROTHREAD { thread_id => id2, ... }
                        )
            =
            id1 == id2;

        fun compare_thread ( MICROTHREAD { thread_id => id1, ... },
                             MICROTHREAD { thread_id => id2, ... }
                           )
            =
            int::compare (id1, id2);

        fun hash_thread (MICROTHREAD { thread_id, ... } )
            =
            unt::from_int  thread_id;

        fun get_thread's_name (MICROTHREAD { name, ... } )
            =
            name;

        fun get_thread's_id (MICROTHREAD { thread_id, ... } )
            =
            thread_id;

        get_thread's_id_as_string
            =
            itt::get_thread's_id_as_string;

        fun get_thread's_state (MICROTHREAD { state, ... } )
            =
            internal_state_to_external_state  *state;

        fun get_thread's_task (MICROTHREAD { task, ... } )
            =
            task;

        fun kill_thread { success, thread => (MICROTHREAD { done_condvar, state, task, ... }) }                                 # If given thread is alive, kill it.  Written 2012-08-11 CrT -- I hope this works. :-)
            =
            {   task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
                #
                                                                                                                                mps::assert_not_in_uninterruptible_scope "kill_thread";
                log::uninterruptible_scope_mutex := 1;
                    #
                    case *state
                        #
                        itt::state::ALIVE
                            =>
                            {   state :=  (success ??  itt::state::SUCCESS :: itt::state::FAILURE);                             # Set it to non-ALIVE.
                                            #
                                mop::set_condvar__iu  done_condvar;                                                             # Tell the world the thread is now non-ALIVE.
                                                                                                                                # NB: We are very careful to set done_condvar EXACTLY once, immediately AFTER exiting ALIVE state.
                                alive_threads_count :=  *alive_threads_count - 1;                                               # One less live thread in this task.

                                if (not success)
                                    #
                                    case *task_state
                                        #
                                        itt::state::ALIVE
                                            =>
                                            if (task_id > 0)                                                                    # Default task never exits ALIVE state. 
                                                #
                                                task_state :=  itt::state::FAILURE;                                             # If a thread ends in FAILURE, its task ends in FAILURE also.
                                                #
                                                mop::set_condvar__iu  task_condvar;                                             # Tell the world the task is now non-ALIVE.
                                            fi;

                                        _     =>    ();
                                    esac;
                                fi;     

                                case *task_state
                                    #
                                    itt::state::ALIVE
                                        =>
                                        if (task_id > 0)                                                                        # Default task never exits ALIVE state. 
                                            #
                                            task_state :=  itt::state::SUCCESS;                                                 # All threads in task ended in SUCCESS, so task ends in SUCCESS.
                                            #
                                            mop::set_condvar__iu  task_condvar;                                                 # Tell the world the task is now non-ALIVE.
                                        fi;

                                    _     =>    ();
                                esac;
                            };

                        _ =>    ();                                                                                             # Killing a dead thread is a no-op.
                    esac;
                    #
                log::uninterruptible_scope_mutex := 0;
            };

        fun mark_thread_as_done_and_dispatch_next_thread (MICROTHREAD { state, done_condvar, task, ... }, final_state)
            =
            {   task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
                #
                                                                                                                                mps::assert_not_in_uninterruptible_scope "mark_thread_as_done_and_dispatch_next_thread";
                log::uninterruptible_scope_mutex := 1;
                    #
                    case *state
                        #
                        itt::state::ALIVE
                            =>
                            {   state :=  final_state;                                                                          # Set it to non-ALIVE.
                                #       
                                mop::set_condvar__iu  done_condvar;                                                             # Tell the world it is now non-ALIVE.

                                alive_threads_count :=  *alive_threads_count - 1;                                               # One less live thread in this task.

                                case final_state
                                    #
                                    itt::state::FAILURE
                                        =>
                                        if (task_id > 0)                                                                        # Default task never dies.
                                            #
                                            task_state :=  itt::state::FAILURE;                                                 # Task fails if any of its threads fail.
                                            #
                                            mop::set_condvar__iu  task_condvar;                                                 # Tell the world the task is now non-ALIVE.
                                        fi;     

                                    itt::state::SUCCESS
                                        =>
                                        case *alive_threads_count
                                            #
                                            0 =>    case *task_state
                                                        #
                                                        itt::state::ALIVE
                                                            =>
                                                            {   task_state :=  itt::state::SUCCESS;                             # Task succeeds if all of its threads succeeded.
                                                                #
                                                                mop::set_condvar__iu  task_condvar;                             # Tell the world the task is now non-ALIVE.
                                                            };

                                                        _   =>    ();
                                                    esac;
                                            _ =>    ();
                                        esac;

                                    _   =>  {   printf "Impossible case -- microthread.pkg\n";
                                                winix__premicrothread::process::exit(1);
                                            };
                                esac;
                            };

                        _         =>    ();
                    esac;
                    #
                mps::dispatch_next_thread__xu__noreturn ();
            };

        fun do_handler (MICROTHREAD { exception_handler, state, done_condvar, task, ... }, exn)
            =
            {   task -> APPTASK { task_id, task_condvar, task_state, alive_threads_count, ... };
                #
                                                                                                                                mps::assert_not_in_uninterruptible_scope "do_handler (microthread.pkg)";
                log::uninterruptible_scope_mutex := 1;
                    #
                    case *state
                        #
                        itt::state::ALIVE
                            =>
                            {   state :=  itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION  exn;                                   # Set it to non-ALIVE.
                                #
                                mop::set_condvar__iu  done_condvar;                                                             # Tell the world the thread is now non-ALIVE.

                                alive_threads_count :=  *alive_threads_count - 1;                                               # One less live thread in this task.

                                if (task_id > 0)                                                                                # Default task never dies.
                                    #
                                    case *task_state
                                        #
                                        itt::state::ALIVE
                                            =>
                                            {   task_state :=  itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION  exn;              # Thread death-by-uncaught-exception *always* kills the task.
                                                #
                                                mop::set_condvar__iu  task_condvar;                                             # Tell the world the task is now non-ALIVE.
                                            };

                                        _     =>    ();
                                    esac;
                                fi;
                            };

                        _     =>    ();
                    esac;
                    #
                log::uninterruptible_scope_mutex := 0;

                *exception_handler exn
                except
                    _ = ();
            };

        Make_Thread_Args =  THREAD_NAME  String                                                         # Future-proofing:  we can add additional options (priority?) here in future without breaking existing code.
                         |  THREAD_TASK  Apptask                                                        # New thread will be a member of this task (instead of defaulting to same task as parent thread).
                         ;

        fun run_thread__xu  new_thread  f x 
            =
            fate::call_with_current_fate
                (\\ old_fate
                    =
                    {
                        mps::enqueue_old_thread_plus_old_fate_then_install_new_thread  {  new_thread,  old_fate  };
                        log::uninterruptible_scope_mutex := 0;

                        {   f x;
                            #
                        }
                        except
                            ex =  do_handler (new_thread, ex);

                        mark_thread_as_done_and_dispatch_next_thread  (new_thread, itt::state::SUCCESS);
                    }
                );

        fun make_thread' args f x 
            =
            {
                (mps::get_current_microthread ())
                    ->
                    MICROTHREAD { task, ... };

                makethread { args, name => "", task };                                                          # Default to creating new thread in current task with empty name.
            }
            where
                fun makethread  { args => (THREAD_NAME n  ! rest),  name, task }
                        =>
                        makethread { args => rest,  name => n,  task };                                         # Note caller-supplied non-default  name  for new thread.

                    makethread  { args => (THREAD_TASK t  ! rest),  name, task }
                        =>
                        makethread { args => rest,  name, task => t };                                          # Note caller-supplied non-default  task  for new thread.

                    makethread { args => [], name, task }
                        =>
                        {
                            task -> APPTASK { alive_threads_count, ... };

                                                                                                                mps::assert_not_in_uninterruptible_scope "make_thread";
                            log::uninterruptible_scope_mutex := 1;
                            #
                                alive_threads_count :=  *alive_threads_count + 1;                               # Task now has one more live thread.

                                new_thread = make_microthread__iu  name  task;

# (mps::get_current_microthread ()) -> MICROTHREAD { name => thread_name, thread_id, task => old_task, ... };
# old_task -> APPTASK { task_name => old_task_name, task_id => old_task_id, ... };
# task -> APPTASK { task_name, task_id, ... };
# new_thread -> MICROTHREAD { name => new_thread_name, thread_id => new_thread_id, ... };
# log::note {. sprintf "make_thread' creating thread %d:%d(%s)"  new_thread_id  task_id  new_thread_name; };
# file::print "make_thread'                                                                           "; mps::print_thread_scheduler_state ();  file::print "\n";
# file::print "make_thread' called by '"; file::print    thread_name; file::print "'==";  mps::print_int 2     thread_id;  file::print " (in task '"; file::print old_task_name; file::print "'="; mps::print_int 2 old_task_id; file::print ") ";
# file::print "CREATING THREAD '";       file::print new_thread_name; file::print "'$=";  mps::print_int 2 new_thread_id;  file::print " (in task '"; file::print     task_name; file::print "'="; mps::print_int 2     task_id; file::print ")\n";
                                #
                            run_thread__xu  new_thread  f x;

                            new_thread;                                                         # Returns our result when (old_thread, old_fate) next gets scheduled to run.
                        };
                end;
            end;        

# Eventually, above should use make_isolated_fate, but that is too slow at present.
# The eventual code would look like:
#
#       fun make_thread' args f x
#           =
#           id
#           where
#             log::uninterruptible_scope_mutex := 1;
#             id = make_microthread__iu name    itt::default_task;
#             fun thread ()
#                 =
#                 (   (f x)
#                     except
#                         ex =  do_handler (id, ex);
#
#                     mark_thread_as_done_and_dispatch_next_thread (id, itt::state::SUCCESS);
#                 );
#
#               fate::call_with_current_fate
#                 (\\ parent_fate
#                     =
#                     ( mps::run_thread (id, parent_fate);
#                       log::uninterruptible_scope_mutex := 0;
#                       fate::throw (fate::make_isolated_fate thread) ();
#                     )
#                 );
#             end;
#

        fun make_thread name f                                                                          # Convenience fn for the most common case.
            =
            make_thread'  [ THREAD_NAME name ]  f  ();



        fun make_task  name  threads
            =
            {   task =  make_apptask  name;                                                             # Make the task record which will be shared by our new threads.
                #
                fun make_and_queue_thread__iu  (threadname: String,  threadbody: Void -> Void)
                    =
                    {   microthread =  make_microthread__iu  threadname  task;                          # Make the Microthread record for the new thread.
                        #
                        threadfate                                                                      # Make the fate for the new thread.
                            =
                            call_with_current_fate
                                (\\ fate
                                    =
                                    {   call_with_current_fate  (\\ fate' =  switch_to_fate  fate  fate');
                                        #
                                        threadbody ()                                                   # This block of code becomes 'threadfate'.
                                        except                                                          #
                                            ex =  do_handler (microthread, ex);                         #
                                                                                                        #
                                        mark_thread_as_done_and_dispatch_next_thread                    #
                                            (microthread, itt::state::SUCCESS);                         #
                                    }
                                );

                        mps::push_into_run_queue (microthread, threadfate);                             # Schedule the new thread to run.
                    };


                task -> APPTASK { alive_threads_count, ... };

                                                                                                        mps::assert_not_in_uninterruptible_scope "make_task";
                log::uninterruptible_scope_mutex := 1;                                                  # We want all threads in task queued before any have a chance to run, to avoid weird
                    #                                                                                   # race conditions where one throws an exception before others get created (say).
                    apply  make_and_queue_thread__iu  threads;
                    #
                    alive_threads_count :=  list::length threads;
                    #
                log::uninterruptible_scope_mutex := 0;

                task;
            };




        fun thread_done__mailop  (MICROTHREAD { done_condvar, ... } )
            =
            mop::wait_on_condvar'  done_condvar;                                                        # This and the next are the only uses of   wait_on_condvar'

        fun   task_done__mailop  (  APPTASK { task_condvar, ... } )
            =
            mop::wait_on_condvar'  task_condvar;                                                        # This and the prev are the only uses of   wait_on_condvar'


        fun get_current_microthread ()
            =
            if (tsr::thread_scheduler_is_running())    mps::get_current_microthread ();
            else                                       raise exception THREAD_SCHEDULER_NOT_RUNNING;
            fi;


        fun get_current_microthread's_name ()
            =
            get_thread's_name (get_current_microthread ())
            except
                THREAD_SCHEDULER_NOT_RUNNING
                    =
                    "[no thread]";
                    #
                    # When microthread_preemptive_scheduler is not running
                    # get_thread() returns garbage              XXX BUGGO FIXME
                    # and using that result will SEGV us.
                    #
                    # We return a dummy value here (rather
                    # than letting the exception propagate)
                    # for the convenience of logger.pkg
                    # logging.

                                                                                my _ =
        log::get_current_microthread's_name__hook := get_current_microthread's_name;

        fun get_current_microthread's_id ()
            =
            {
                (get_current_microthread ())
                    ->
                    MICROTHREAD { thread_id, ... };

                thread_id;
            }
            except
                THREAD_SCHEDULER_NOT_RUNNING
                    =
                    0;
                    #
                    # See comments to get_current_microthread's_name (), above.



        fun thread_exit { success }
            =
            {   (get_current_microthread ())
                    ->
#                   (tid as MICROTHREAD { properties, ... } );
                    (tid as MICROTHREAD { properties, name, ... } );
# file::print "\n\n====================\nthread_exit("; file::print name; file::print "...\n";
# (mps::get_current_microthread ()) -> MICROTHREAD { name => thread_name, thread_id, task, ... };
# task -> APPTASK { task_name, task_id, ... };
# file::print "thread_exit                                                                      "; mps::print_thread_scheduler_state ();  file::print "\n";
# file::print "thread_exit for "; file::print thread_name; file::print "'="; mps::print_int 2 thread_id; file::print " (in task '"; file::print task_name; file::print "'="; mps::print_int 2 task_id; file::print ") "; file::print ")\n";
# file::print "\n====================\n\n";

                mark_thread_as_done_and_dispatch_next_thread
                  ( tid,
                    success  ??  itt::state::SUCCESS
                             ::  itt::state::FAILURE
                  );
            };


        fun yield ()
            =
            fate::call_with_current_fate
                (\\ fate
                    =
                    {
                                                                                        mps::assert_not_in_uninterruptible_scope "yield (microthread.pkg)";
                        log::uninterruptible_scope_mutex := 1;
                        mps::yield_to_next_thread__xu  fate;
                    }
                );

        fun get_exception_that_killed_thread (MICROTHREAD {      state => REF (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION x), ... } )        =>  THE x;
            get_exception_that_killed_thread _                                                                                          =>  NULL;
        end;

        fun get_exception_that_killed_task   (APPTASK   { task_state => REF (itt::state::FAILURE_DUE_TO_UNCAUGHT_EXCEPTION x), ... } )  =>  THE x;
            get_exception_that_killed_task   _                                                                                          =>  NULL;
        end;

        # Thread-local data 
        #
        stipulate

            fun make_property ()
                =
                {   exception EXCEPTION  X; 

                    fun cons (a, l)
                        =
                        EXCEPTION a ! l; 

                    fun peek []                =>  NULL;
                        peek (EXCEPTION a ! _) =>  THE a;
                        peek (_ ! l)           =>  peek l;
                    end;

                    fun delete []                =>  [];
                        delete (EXCEPTION a ! r) =>  r;
                        delete (x ! r)           =>  x ! delete r;
                    end;

                    { cons, peek, delete };
                };

            fun make_bool ()
                =
                {   exception EXCEPTION;

                    fun peek [] => FALSE;
                        peek (EXCEPTION ! _) => TRUE;
                        peek (_ ! l) => peek l;
                    end;

                    fun set (l, flag)
                        =
                        set (l, [])
                        where
                            fun set ([], _)             =>  if flag  EXCEPTION ! l; else l;fi;
                                set (EXCEPTION ! r, xs) =>  if flag  l; else list::reverse_and_prepend (xs, r);fi;
                                set (x ! r, xs)         =>  set (r, x ! xs);
                            end;
                        end;

                    { set, peek };
                };

            fun get_properties ()
                =
                {   (get_current_microthread ())
                        ->
                        MICROTHREAD { properties, ... };

                    properties;
                };

        herein

            fun make_per_thread_property (init:  Void -> Y)
                =
                {   my { peek, cons, delete }
                        =
                        make_property (); 

                    fun peek_fn ()
                        =
                        peek (*(get_properties ()));

                    fun get_f ()
                        =
                        {   h = get_properties ();

                            case (peek *h)

                                THE b => b;

                                NULL  => {   b = init ();
                                             h := cons (b, *h);
                                             b;
                                         };
                            esac;
                        };

                    fun clr_f ()
                        =
                        {   h = get_properties ();

                            h := delete *h;
                        };

                    fun set_fn x
                        =
                        {   h =  get_properties ();

                            h :=  cons (x, delete *h);
                        };

                    { peek  => peek_fn,
                      get   => get_f,
                      clear => clr_f,
                      set   => set_fn
                    };
                };

            fun make_boolean_per_thread_property ()
                =
                {   my { peek, set }
                        =
                        make_bool ();

                    fun get_f ()
                        =
                        peek(*(get_properties ()));

                    fun set_f flag
                        =
                        {   h = get_properties ();

                            h := set (*h, flag);
                        };

                    { get => get_f,
                      set => set_f
                    };
                };

        end;                                    # stipulate
    };
end;


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext