PreviousUpNext

15.4.56  src/app/makelib/concurrency/makelib-thread-queen.pkg

## makelib-thread-queen.pkg

# Compiled by:
#     src/app/makelib/concurrency/makelib-concurrency.sublib



# OVERVIEW
# ========
#
# This is a very simple thread package used by
#
#     src/app/makelib/compile/compile-in-dependency-order-g.pkg
#     src/app/makelib/mythryl-compiler-compiler/mythryl-compiler-compiler-g.pkg
#
# when running multiple copies of the compiler in
# parallel as unix subprocesses so as to save wall-clock
# time when compiling on multi-core machines.
#
# This package does nothing that thread-kit doesn't do better;
# it is here only so we can do parallel compiles even when
# thread-kit isn't installed.
#
#
#
#         NB: Throughout this file, 'thread' refers to lightweight
#             application-specific threads, not heavyweight threads
#             managed by the OS kernel.  In other words, we are here
#             concerned at the in-process level with multiprogramming,
#             not multiprocessing.  (Our unix subprocesses do give
#             us multiprocessing, of course.)
#
#
#
# We create threads using a 'make_makelib_thread' command
# which takes a function to compute and returns a 'thread'
# object to the compiler:
#
#     thread = sit::make_makelib_thread .{ something_to_compute (); };
#
# We also supply a 'wait_for_thread_to_finish_then_return_result' function which may be applied
# to such thread objects, and which yields the final result
# computed by the thread:
#
#     result =  sit::wait_for_thread_to_finish_then_return_result  my_thread;
#
# Calling 'wait_for_thread_to_finish_then_return_result' on a thread T which has not yet
# completed its computation blocks the caller until T terminates.
#
# Thus, a thread exists in one of two states:
#
#   o RUNNING, when it is associated with a queue of blocked
#     threads waiting to read its result value, and
#
#   o DONE, when it is associated with its return value.
#



# Virtual Threads
# ---------------

# "Virtual threads" essentially make our thread wait queues
# available without the bother of having an actual thread.
#
# A virtual thread is created via
#
#      my_virtual_thread =  sit::spawn_virtual_thread ();
#
# Note that no actual function to compute is specified,
# and none ever exists.  But threads which do
#
#     sit::wait_for_thread_to_finish_then_return_result  my_virtual_thread;
#
# block just as though it was a regular running thread.
#
# Since there is no actual code running, by default these
# threads would block forever. Consequently, we provide a
#
#     sit::terminate_virtual_thread
#         #
#         my_virtual_thread;
#
# call which simulates termination of the virtual thread
# and unblocks all threads waiting on its result
# (which is always Void).
#
# Thus, virtual threads facilitate simple kinds of
# manual thread scheduling without having to sink
# all the way to the callcc level.

#


# Stream Proxy Threads
# --------------------

# Since the main purpose of this package is to support
# compiler interaction with multiple compiler subprocesses,
# we need to have have a way for a thread to block
# until input becomes available from a given unix pipe.
# Stream proxy threads answer this need.
#
# A stream proxy thread is created by doing
#
#     my_proxy_thread
#         =
#         sit::make_unix_pipe_input_wait_queue
#             #
#             (pipe: file::Input_Stream);
#
# where the stream in question is a pipe from
# a fork()ed unix subprocess.   Any thread which
# reads the "result" of such a proxy thread via
#
#     sit::wait_for_thread_to_finish_then_return_result  my_proxy_thread;
#
# will then block until input is available from
# that unix subprocess.
#
# Underneath, this is implemented using a unix
# select() / poll() call;  stream proxy threads
# are essentially a graceful way providing
# thread access to select() / poll() functionality.



# Priorities
# ----------

# To provide some minimal control over scheduling
# of runnable threads, we allow threads to specify
# integer priorities by calling

#     sit::wait_for_thread_to_finish_then_return_result_running_at_priority  some_priority  some_thread;

# instead of just

#     sit::wait_for_thread_to_finish_then_return_result                            some_thread;

# priority.  When multiple threads are ready to run,
# the thread with the highest priority is selected:
#
#     low_priority  = 1;
#     high_priority = 999;
#
#     starting_gun =  sit::spawn_virtual_thread ();
#
#     goes_first  = sit::make_thread .{ sit::wait_for_thread_to_finish_then_return_result_running_at_priority high_priority starting_gun;  print "I went first!\n"; };
#     goes_second = sit::make_thread .{ sit::wait_for_thread_to_finish_then_return_result_running_at_priority low_priority  starting_gun;  print "I went second.\n"; };
#
#     sit::terminate_virtual_thread  starting_gun;

# The above will result in 'goes_first' and 'goes_second'
# executing in the obvious order.



# IMPLEMENTATION NOTES
# ====================
#
# Since the point of this package is to provide an
# extremely lightweight alternative to the full
# thread-kit package, we keep things simple by
# providing bare-minumum functionality:
#
#    o We do no pre-emption.  The only way
#      for a thread to give up control of
#      the processor is to terminate or
#      call sit::wait_for_thread_to_finish_then_return_result on another thread.
#
#    o We do select() / poll() calls to check
#      for input from subprocesses only when
#      sit::wait_for_thread_to_finish_then_return_result is called
#      and there are no runnable threads.
#      
#    o For a priority queue we use a simple
#      list maintained using O(N**2) insertion
#      sort.  
#      
# These simplifications work fine for 
#      
#     src/app/makelib/compile/compile-in-dependency-order-g.pkg
#     src/app/makelib/mythryl-compiler-compiler/mythryl-compiler-compiler-g.pkg
#      
# which are written with them in mind, but may
# easily cause problems in general.  For example,
# spewing commands to the subprocesses without
# pausing to read return results from them will
# quickly produce deadlock.
#
# So, in general -- use thread-kit. :)



        #                           "Too much work and too much energy
        #                            kill a man just as effectively as
        #                            too much assorted vice or too much drink."
        #
        #                                           -- Rudyard Kipling


stipulate
    package fil =  file__premicrothread;                                # file__premicrothread          is from   src/lib/std/src/posix/file--premicrothread.pkg
    package wio =  winix__premicrothread::io;                           # winix__premicrothread::io     is from   src/lib/std/src/posix/winix-io--premicrothread.pkg
herein

    api Makelib_Thread_Queen {
        #
        Makelib_Thread(X);                                              # Externally visible type of a thread.
                                                                        # X is the thread result type.
        Makelib_Thread_Queen;

        make_makelib_thread_queen: Void -> Makelib_Thread_Queen;

        make_makelib_thread
            :
            Makelib_Thread_Queen
            ->
            (Void -> X)                                                 # Code to run in thread.
            ->
            Makelib_Thread(X);

        wait_for_thread_to_finish_then_return_result:                      Makelib_Thread_Queen ->        Makelib_Thread(X) -> X;               # Waits, then resumes running at minimum priority. 
        wait_for_thread_to_finish_then_return_result_running_at_priority:  Makelib_Thread_Queen -> Int -> Makelib_Thread(X) -> X;               # Waits, then resumes running at given   priority.
            #
            # Priority when using               wait_for_thread_to_finish_then_return_result_running_at_priority
            # is always higher than when using  wait_for_thread_to_finish_then_return_result


        read_line_from_unix_pipe                                        # While waiting this will yield the process, allowing other Makelib_Threads to run.
            :
            Makelib_Thread_Queen
            -> fil::Input_Stream
            -> Null_Or( String );


        make_wait_queue                                                 # Create a "thread" which will (only) "terminate" when   run_all_threads_in_wait_queue  is called.
            :                                                           # 
            Makelib_Thread_Queen -> Makelib_Thread( Void );

        run_all_threads_in_wait_queue                                   # Move to the run queue all threads in wait queue.
            :                                                           # 
            Makelib_Thread_Queen
            -> Makelib_Thread( Void )
            -> Void;


        reset_thread_manager:     Makelib_Thread_Queen -> Void; # Forget all threads and threads waiting on them.
        no_runnable_threads:      Makelib_Thread_Queen -> Bool; # Check whether there are any (other) runnable threads.

        get_cores_in_use:  Makelib_Thread_Queen -> Int;
        set_cores_in_use:  (Makelib_Thread_Queen, Int) -> Void;
    };
end;

stipulate
    package fat =  fate;                                                        # fate                                                          is from   src/lib/std/src/nj/fate.pkg
    package fil =  file__premicrothread;                                        # file__premicrothread                                          is from   src/lib/std/src/posix/file--premicrothread.pkg
    package pur =  file__premicrothread::pur;                                   # file__premicrothread::pur                                     is from   src/lib/std/src/io/winix-text-file-for-os-g--premicrothread.pkg
    package tbi =  winix_base_text_file_io_driver_for_posix__premicrothread;    # winix_base_text_file_io_driver_for_posix__premicrothread      is from   src/lib/std/src/io/winix-base-text-file-io-driver-for-posix--premicrothread.pkg
    package wio =  winix__premicrothread::io;                                   # winix__premicrothread::io                                     is from   src/lib/std/src/posix/winix-io--premicrothread.pkg
    package wnx =  winix__premicrothread;                                       # winix__premicrothread                                         is from   src/lib/std/winix--premicrothread.pkg
herein

    package makelib_thread_queen
    :       Makelib_Thread_Queen
    {
        # Type for (thread, priority) pairs in our priority queues: 
        #
        Priority_Queue_Entry
            =
            (fat::Fate( Void ), Int);                           # (Thread, Priority)


        Thread_Vim(X)
          #
          = DONE(X)                                             # Value of thread. 
          | RUNNING  List( Priority_Queue_Entry )               # Threads waiting for thread value to be computed. 
          ;
            #
            # Type to hold thread state.
            # State is initially RUNNING, carrying a list of
            # threads waiting for the result of this thread.
            # Once the thread result value is known,
            # we change the state to DONE(result), and move all
            # the previously waiting threads to the run queue:

        Makelib_Thread(X)
            =
            Ref( Thread_Vim(X) );


        Thread_Priority_Queue                                   # Type for simple, brain-dead priority queue.  Entries are (thread, priority) pairs.
            =
            Ref( List( Priority_Queue_Entry ) );


        Makelib_Thread_Queen                                    # Encapsulates the state for this package. SML/NJ keeps it all in global variables, we keep it in Makelib_State from   src/app/makelib/main/makelib-state.pkg
          =
          { runnable_threads_priority_queue:    Thread_Priority_Queue,
            #
            unix_pipes_to_watch:                Ref( List( ( Makelib_Thread( Void ), wio::Ioplea ) ) ),
            #
            cores_in_use:                       Ref( Int ),
            #
            core_wait_queue:                    Ref( Thread_Vim( Void ) )
          };

        fun make_makelib_thread_queen (): Makelib_Thread_Queen
            =
            { runnable_threads_priority_queue   =>  (REF []):   Thread_Priority_Queue,
              unix_pipes_to_watch               =>  REF ([]:   List( ( Makelib_Thread( Void ), wio::Ioplea ) ) ),
              cores_in_use                      =>  REF 0,                                                              # We use this to avoid running more compile child-processes than cores  in  src/app/makelib/compile/compile-in-dependency-order-g.pkg
              core_wait_queue                   =>  REF (RUNNING [])                                                    # We use this to wait for an available core                             in  src/app/makelib/compile/compile-in-dependency-order-g.pkg
            };


        fun get_cores_in_use (queen: Makelib_Thread_Queen)         =  *queen.cores_in_use;
        fun set_cores_in_use (queen: Makelib_Thread_Queen, i: Int) =   queen.cores_in_use := i;

        stipulate
            fun enqueue  (qref as REF queue, x as (_, x_priority))
                =
                # Insert (value, priority) pair x
                # into priority queue 'qr' via side-effect,
                # keeping the latter sorted by priority.
                #
                qref :=  insert  queue
                where
                    fun insert []
                            =>
                            [x];

                        insert ((y as (_, y_priority)) ! rest)                          # This results in an idiot O(N**2) insertion sort.  Why not just use a red-black-tree or priority queue? -- 2011-09-21 CrT
                            =>
                            if (x_priority >= y_priority)   x ! y ! rest;
                            else                            y ! insert rest;
                            fi;
                            #
                            # NB: The ">=" is important here. If we had used ">" then
                            # the code in mythryl-compiler-compiler-g.pkg would not
                            # perform as desired.  In particular, the parser
                            # thread would end up being scheduled first,
                            # effectively preventing the "cmb" message
                            # from being sent to the server processes.
                            # (With preemption this would not be a problem.)
                    end;
                end;


            # Pop first entry off priority queue
            # via side-effect, and return the
            # popped entry:
            #
            fun dequeue (REF [])
                    =>
                    NULL;

                dequeue (queue_ref as REF (first ! rest))
                    =>
                    {   queue_ref := rest;
                        #
                        THE first;
                    };
            end;


            # A thread has terminated, returning 'result'.
            #
            # The thread record 'r' holds the list of threads
            # waiting for this thread to terminate.
            #
            # Change the thread state from RUNNING to DONE (result)
            # via side-effect, and move the waiting threads
            # to the run queue:
            #
            fun handle_thread_termination  (queen: Makelib_Thread_Queen)  (thread_state as REF (RUNNING waiting_threads), result)
                    =>
                    {   thread_state :=  DONE result;
                        #
                        apply'
                            waiting_threads
                            (fn waiting_thread =  enqueue (queen.runnable_threads_priority_queue, waiting_thread));
                    };

                handle_thread_termination queen (REF (DONE _), _)
                    =>
                    {   fil::say .{ "thread terminated twice!"; };
                        #
                        raise exception FAIL "thread";
                    };
            end;


            #
            fun select_on_input_file_descriptors  queen
                =
                case *queen.unix_pipes_to_watch
                    #
                    []  =>  {   fil::say .{ "deadlock!"; };
                                #
                                raise exception FAIL "thread";
                            };

                    subprocesses
                        =>
                        {   # Get a poll list of unix descriptors
                            # corresponding to our unix subprocesses: 
                            #   
                            wait_requests
                                =
                                map #2 subprocesses;



                            # Do a Unix-level poll on our child-pid list.
                            # Since there is nothing else to do (we are only
                            # called if 'runnable_threads_priority_queue' is empty),
                            # we can afford to block at the unix process level
                            # until at  least one child has exited:
                            #   
printf "src/app/makelib/concurrency/makelib-thread-queen.pkg: calling   wio::wait_for_io_opportunity\n";
                            wait_results
                                =
                                wio::wait_for_io_opportunity  { wait_requests,  timeout => NULL };
printf "src/app/makelib/concurrency/makelib-thread-queen.pkg: back from wio::wait_for_io_opportunity\n";

                            # Partition our input list into two
                            # lists not/ready of those which have/not
                            # completed execution:
                            #
                            fun is_ready (_, ioplea: wio::Ioplea)
                                =
                                {   fun same_io_descriptor  (poll_result: wio::Ioplea_Result)
                                        =
                                        wio::compare
                                            (
                                              ioplea.io_descriptor,
                                              poll_result.io_descriptor
                                            )
                                            ==
                                            EQUAL;

                                    list::exists
                                        same_io_descriptor
                                        wait_results;
                                };

                            (list::partition  is_ready  subprocesses)
                                ->
                                (ready, not_ready);


                            # Schedule the 'not_ready' pids
                            # to be checked again next time around:
                            #   
                            queen.unix_pipes_to_watch
                                :=
                                not_ready;

                            # For each 'ready' list entry (which represents
                            # a unix subprocess pipe stream which now has
                            # output available for us to read) mark the matching
                            # proxy thread as terminated, which moves to the
                            # run queue any threads waiting to read its result:
                            #
                            apply
                                (fn (proxy_thread, _) =  handle_thread_termination queen (proxy_thread, ()))
                                ready;

                            # There should now be a ready-to-run
                            # thread in the run queue, since:
                            #
                            # (1) The above poll doesn't return until a
                            #     subprocess has something to read;
                            #
                            # (2) We then set the thread corresponding
                            #     to that subprocess to DONE, which moves all
                            #     threads waiting on it to the run queue;
                            #
                            # (3) There should be at least one such thread
                            #     -- that which spawned that unix subprocess.
                            #
                            # So -- switch to the highest-priority thread in
                            #     runnable_threads_priority_queue:
                            #
                            case (dequeue  queen.runnable_threads_priority_queue)
                                #
                                 NULL =>
                                     {
                                         fil::say .{ "src/app/makelib/concurrency/makelib-thread-queen.pkg: select_on_input_file_descriptors failed to wake anybody up!"; };
                                       #        
                                         raise exception FAIL "thread";
                                     };

                                 THE (thread_state, _)
                                     =>
                                     {  
                                         fat::switch_to_fate  thread_state  ();         # 
                                     };
                            esac;
                        };
                esac;                                                                   # fun select_on_input_file_descriptors ()

            fun run_highest_priority_runnable_thread_else_select_on_input_file_descriptors  queen
                =
                # Pick next thread to run, and run it. 
                # If we have more than one local ready-to-run
                # thread, we run the highest-priority one of them.
                # Otherwise, we wait for input from one of our
                # forked unix subprocesses:
                #
{
log::note .{ "run_highest_priority_runnable_thread_else_select_on_input_file_descriptors/AAA"; };
                case  (dequeue  queen.runnable_threads_priority_queue)
                    #     
                    THE (thread_state, _)
                        =>
                        {
log::note .{ "run_highest_priority_runnable_thread_else_select_on_input_file_descriptors/BBB"; };
                            fat::switch_to_fate  thread_state ();                               # Run local fate. 
                        };

                    NULL =>
                        {
log::note .{ "run_highest_priority_runnable_thread_else_select_on_input_file_descriptors/CCC"; };
                            select_on_input_file_descriptors  queen;                    # Wait for Unix-level input. 
                        };
                esac;
};
        herein

            #
            fun reset_thread_manager  (queen: Makelib_Thread_Queen)                     # Reset all state managed by this package.
                =
                {    queen.runnable_threads_priority_queue :=  [];
                     queen.unix_pipes_to_watch             :=  [];
                };

            #
            fun no_runnable_threads  (queen: Makelib_Thread_Queen)
                =
                list::null *queen.runnable_threads_priority_queue;

            #
            fun make_wait_queue  queen
                =
                (REF (RUNNING [])):  Makelib_Thread( Void );


            #
            fun run_all_threads_in_wait_queue  queen  (REF (DONE ()))
                     =>
                     ();

                run_all_threads_in_wait_queue  queen  waiting_threads
                    =>
                    handle_thread_termination  queen  (waiting_threads, ());
            end;

            stipulate
                # Read thread result, with given priority.
                #
                # This suspends execution of the current
                # thread until target thread finishes.
                #
                # A thread's priority has no effect while
                # it is waiting, but becomes its scheduling
                # priority once it is ready to run -- when
                # a thread with multiple waiting threads
                # is fixed, the highest-priority thread
                # runs first.
                #
                # When we do finally run again, the value
                # of the thread becomes the return value
                # of the result call:
                #
                fun wait_for_thread_to_finish_then_return_result_running_at_priority' queen _ (REF (DONE result))
                        =>
                        # Thread is DONE, just return its result:
                        #
                        result;

                     wait_for_thread_to_finish_then_return_result_running_at_priority' queen priority (thread_state as REF (RUNNING thread_state_list))
                        => 
                        # Thread is not done:  Add ourself to thread's wait
                        # queue and schedule some other thread to run:
                        #
                        {
                            fat::call_with_current_fate
                                #
                                (fn current_thread
                                    =
                                    {
                                        thread_state :=  RUNNING ((current_thread, priority) ! thread_state_list);
                                        #
                                        run_highest_priority_runnable_thread_else_select_on_input_file_descriptors  queen;
                                    }
                                );

                            wait_for_thread_to_finish_then_return_result_running_at_priority'  queen  priority  thread_state;
                        };
                end;
            herein

                #
                fun wait_for_thread_to_finish_then_return_result  queen  thread
                    =
                    wait_for_thread_to_finish_then_return_result_running_at_priority'  queen  0  thread;


                fun wait_for_thread_to_finish_then_return_result_running_at_priority  queen  priority  thread
                    =
                    wait_for_thread_to_finish_then_return_result_running_at_priority' queen  (priority + 1)  thread;
            end;


            # Fire off an internal thread. 
            #
            # 'thunk_for_thread_to_run' is the thunk
            # to be evaluated by the new thread.
            #
            # Return the matching Thread.
            #
            fun make_makelib_thread  queen  thunk_for_thread_to_run
                =
                {   new_thread = REF (RUNNING []);        # The value we return to caller.

                    # We capture two fates here in succession:
                    # o   'return_fate', which is the thread which will return from the 'make_thread' call;
                    # o   'thread_fate'  which is the make_makelib_thread()ed thread, which will
                    #       1. Evaluate thunk_for_thread_to_run ()
                    #       2. Store thunk_for_thread_to_run()'s result into the returned condition 'c',
                    #          waking any threads waiting on it.
                    #       3. Terminate the thread by calling
                    #          run_highest_priority_runnable_thread_else_select_on_input_file_descriptors queen.
                    # We put 'thread_fate' in the run queue and then
                    # have 'make_thread' return 'thread':
                    #
                    fat::call_with_current_fate
                        #
                        (fn return_fate
                            =
                            {   fat::call_with_current_fate
                                    #
                                    (fn thread_fate
                                        =
                                        {   enqueue  (queen.runnable_threads_priority_queue,  (thread_fate, -1));
                                            #
                                            fat::switch_to_fate  return_fate  new_thread;                               # 
                                        }
                                    );

                                handle_thread_termination  queen  (new_thread, thunk_for_thread_to_run ());

                                run_highest_priority_runnable_thread_else_select_on_input_file_descriptors  queen;
                            }
                        );
                };


            stipulate
                fun make_unix_pipe_input_wait_queue
                        #
                        (queen:  Makelib_Thread_Queen)
                        #
                        (input_stream:  fil::Input_Stream)
                    =
                    # Construct and return a "thread"
                    # which will "terminate" when the
                    # given (unix pipe) input stream
                    # has data to be read.
                    #
                    # As a side effect, we add the returned
                    # thread plus the Unix poll descriptor
                    # to our unix_pipes_to_watch.
                    #
                    # We implement the required functionality by
                    # polling this list for pending input whenever
                    # we have nothing else to do:
                    #
                    {   reader_and_vector
                            =
                            pur::get_reader  (fil::get_instream  input_stream);

                        thread
                            =
                            case reader_and_vector
                                #
                                (tbi::FILEREADER { io_descriptor => THE io_descriptor, ... }, "")
                                    =>
                                    {   thread  =  REF (RUNNING []);
                                        #
                                        request = { io_descriptor,
                                                    readable => TRUE,
                                                    writable => FALSE,
                                                    oobdable => FALSE
                                                  };

                                        queen.unix_pipes_to_watch
                                            :=
                                            (thread, request) ! *queen.unix_pipes_to_watch;

                                        thread;
                                    };

                                (_, "") =>  {   fil::say .{ "make_unix_pipe_input_wait_queue: bad stream"; };
                                                #
                                                raise exception FAIL "concur";
                                            };

                                (_, _)  =>   REF (DONE ());
                            esac;

                        fil::set_instream                                                       # Sets input_stream to point to a new REF-cell holding same reader_and_vector as before.
                            (
                              input_stream,
                              pur::make_instream  reader_and_vector                             # This allocates a new REF-cell holding reader_and_vector.
                            );

                        thread;
                    };                  # fun make_unix_pipe_input_wait_queue
            herein

                fun read_line_from_unix_pipe
                        #
                        (queen: Makelib_Thread_Queen)
                        #
                        (pipe: fil::Input_Stream)
                        #
                        :  Null_Or( String )
                    =
                    {   # Yield the process while we wait for input -- this
                        # allows other Makelib_Threads to run in the meantime:
                        #
                        wait_queue =    (make_unix_pipe_input_wait_queue  queen  pipe);

                        wait_for_thread_to_finish_then_return_result
                            #
                            queen
                            #
                            wait_queue;

                        fil::read_line  pipe;
                    };
            end;                                                                                # stipulate
        end;                                                                                    # stipulate

    };                                                                                          # package   makelib_thread_queen
end;                                                                                            # stipulate



## (C) 1999 Lucent Technologies, Bell Laboratories
## Author: Matthias Blume (blume@kurims.kyoto-u.ac.jp)
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext