PreviousUpNext

15.4.1090  src/lib/std/src/hostthread/cpu-bound-task-hostthreads.pkg

## cpu-bound-task-hostthreads.pkg
#
# Server hostthreads to offload cpu-intensive computations
# from the main threadkit hostthread.
#
# See also:
#
#     src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg
#     src/lib/std/src/hostthread/io-wait-hostthread.pkg

# Compiled by:
#     src/lib/std/standard.lib


stipulate
    package fil =  file__premicrothread;                                        # file__premicrothread                  is from   src/lib/std/src/posix/file--premicrothread.pkg
    package hth =  hostthread;                                                  # hostthread                            is from   src/lib/std/src/hostthread.pkg
    package wxp =  winix__premicrothread::process;                              # winix__premicrothread::process        is from   src/lib/std/src/posix/winix-process--premicrothread.pkg
herein

    package cpu_bound_task_hostthreads
    :       Cpu_Bound_Task_Hostthreads                                          # Cpu_Bound_Task_Hostthreads    is from   src/lib/std/src/hostthread/cpu-bound-task-hostthreads.api
    { 
        # One record for each request
        # supported by the server:
        #
        Do_Stop =  { per_who:  String,  reply: Void   -> Void };
        Do_Echo =  { what:     String,  reply: String -> Void };

        Request =  DO_STOP  Do_Stop                                             # Union of above record types, so that we can keep them all in one queue.
                |  DO_ECHO  Do_Echo
                |  DO_TASK  (Void -> Void)
                ; 

        mutex   =  hth::make_mutex   (); 
        condvar =  hth::make_condvar ();  

        pid                   =  REF 0; 
        running_servers_count =  REF 0;                                         # Count of servers running.  Typically as many as cores on the machine, or maybe one less.
        running_thunks_count  =  REF 0;                                         # Count of servers actually processing a request, as opposed to just being blocked waiting for something to do.

        external_request_queue =  REF ([]: List(Request));
        internal_request_queue =  REF ([]: List(Request));
            #
            # We need two queues because clients will prepend
            # requests to the external queue, leaving it in
            # reverse order, but we want to run tasks in
            # submission order.  So periodically when the
            # internal queue is empty we set it to the
            # reversed contents of the external queue.

        fun get_count_of_live_hostthreads ()
            =
            {
                actual_pid = wxp::get_process_id ();                            # If the heap gets dumped to disk and then and reloaded, running_servers_count will be bogus.
                #                                                               # We detect this by checking if the pid has changed.  There is of course a small chance
                if(*pid != actual_pid)                                          # that by accident we still have the same pid after a save/reload, in which case we lose.       XXX BUGGO FIXME.
                    pid := actual_pid;                                          # A fix might be to have a generation number associated with each heap image which gets
                    #                                                           # incremented on every save/load cycle.
                    running_servers_count :=  0;
                fi;

                *running_servers_count;
            };


        fun external_request_queue_is_empty ()                                  # We cannot write just    fun request_queue_is_empty () =  (*request_queue == []);
            =                                                                   # because Request is not an equality type. (The 'reply' fields are functions
            case *external_request_queue    [] => TRUE;                         # and Mythryl does not support comparison of functions for equality.)
                                            _  => FALSE;
            esac;

        fun internal_request_queue_is_empty ()                                  # We cannot write just    fun request_queue_is_empty () =  (*request_queue == []);
            =                                                                   # because Request is not an equality type. (The 'reply' fields are functions
            case *internal_request_queue    [] => TRUE;                         # and Mythryl does not support comparison of functions for equality.)
                                            _  => FALSE;
            esac;



        fun do_stop (r: Do_Stop)                                                # Internal fn -- will execute in context of server hostthread.
            =
            {
                r.reply ();

                hth::acquire_mutex  mutex;  
                    #
                    running_servers_count :=  *running_servers_count - 1; 
                    running_thunks_count  :=  *running_thunks_count  - 1; 

                    hth::broadcast_condvar condvar;                             # This will in particular wake up the loop in   change_number_of_server_hostthreads_to().
                    #
                hth::release_mutex  mutex;  
                #
                hostthread::hostthread_exit ();         
            };


        fun do_echo (r: Do_Echo)                                                # Internal fn -- will execute in context of server hostthread.
            =
            r.reply  r.what;


        fun do_task (task: Void -> Void)                                        # Internal fn -- will execute in context of server hostthread.
            =
            task ()
            except _ = ();                                                      # Client thunk should never do this to us.  We should log something if it does.  XXX SUCKO FIXME.



        ###############################################
        # The rest of the file is mostly boilerplate:
        ###############################################

        fun stop_one_server_hostthread  (request: Do_Stop)                      # External fn -- will execute in context of client hostthread.
            = 
            { 
                hth::acquire_mutex mutex;  
                    # 
                    external_request_queue :=  (DO_STOP request)  !  *external_request_queue; 
                    # 
                hth::release_mutex mutex;  

                hth::broadcast_condvar condvar;  
            };           

        fun echo  (request: Do_Echo)                                            # External fn -- will execute in context of client hostthread.
            = 
            { 
                hth::acquire_mutex mutex;  
                    # 
                    external_request_queue :=  (DO_ECHO request)  !  *external_request_queue; 
                    # 
                hth::release_mutex mutex;  

                hth::broadcast_condvar condvar;  
            };           

        fun do    (task: Void -> Void)                                          # External fn -- will execute in context of client hostthread.
            = 
            { 
                hth::acquire_mutex mutex;
                    # 
                    external_request_queue :=  (DO_TASK task)  !  *external_request_queue; 
                    # 
                hth::release_mutex mutex;  
                
                hth::broadcast_condvar condvar;  
            };           


        fun get_next_request  () 
            = 
            {
                hth::acquire_mutex mutex;  
                # 
                running_thunks_count :=  *running_thunks_count  - 1;

                for (external_request_queue_is_empty ()
                and  internal_request_queue_is_empty ()
                ){
                    hth::wait_on_condvar (condvar, mutex);
                };

                running_thunks_count :=  *running_thunks_count  + 1;

                case *internal_request_queue
                    #
                    (task ! rest)
                        =>
                        {   internal_request_queue :=   rest;
                            #
                            hth::release_mutex  mutex;  
                            #
                            task;
                        };

                    [] =>
                        case (reverse  *external_request_queue)
                            #
                            [] => raise exception DIE "impossible";             # The above 'for' loop condition guarantees one of the two queues is nonempty.
                            #
                            [ task ]
                                =>
                                {   external_request_queue :=   [];
                                    hth::release_mutex  mutex;  
                                    task;
                                };

                            (task ! rest)
                                =>
                                {   internal_request_queue :=   rest;           # Refill internal queue from external one, reversing to restore original request ordering.
                                    external_request_queue :=   [];
                                    #
                                    hth::release_mutex  mutex;  
                                    #
                                    task;
                                };

                        esac;
                esac;
            };           

        fun server_code ()                                                      # This is the outer loop for each cycleserver hostthread.
            = 
            {
                hth::set_hostthread_name "cpu";

                hth::acquire_mutex mutex;  
                    #
                    running_servers_count :=  *running_servers_count + 1; 
                    running_thunks_count  :=  *running_thunks_count  + 1;       # This will be decremented at the top of  get_next_request().
                    #
                hth::release_mutex  mutex;  

                hth::broadcast_condvar condvar;                                 # This will in particular wake up the loop in   change_number_of_server_hostthreads_to().

                server_loop ();                                                 # Never returns.
            } 
            where 
                fun service_request (DO_STOP r) =>  do_stop r; 
                    service_request (DO_ECHO r) =>  do_echo r;
                    service_request (DO_TASK r) =>  do_task r;
                end; 

                fun server_loop ()
                    = 
                    {
                        service_request (get_next_request())
                        except x = {                                                    # NB: Moving this 'except' clause to position P below results in a bad memory leak.
                            printf "error: cpu::server_loop: Exception!\n";
                            printf "error: cpu::server_loop/exception name s='%s'\n" (exceptions::exception_name    x);
                            printf "error: cpu::server_loop/exception msg  s='%s'\n" (exceptions::exception_message x);
                            raise exception x;                                          # Should probably shut down hard and sudden here. XXX SUCKO FIXME.
                        };
                        #
                        server_loop (); 
                    };                                                                  # Position P.
            end; 

        fun start_one_server_hostthread  per_who
            =
            {
                hth::spawn_hostthread  server_code;
            };


        stipulate
            startstop_mutex   =  hth::make_mutex   ();                                  # This mutex allows only one caller at a time into change_number_of_server_hostthreads_to().
        herein
            
            fun change_number_of_server_hostthreads_to  per_who  desired_hostthreads            # Used both to run server hostthreads at system startup and also to stop them at system shutdown.
                =
                #
                # Our job here is to start (or stop) enough hostthreads
                # to make running_servers_count equal to desired_hostthreads:
                #
                hth::with_mutex_do  startstop_mutex  {.                                 # Unlikely we'll ever have simultaneous calls, but lets be totally safe about it.
                    #
                    pid :=  wxp::get_process_id ();

                    current_hostthreads =  get_count_of_live_hostthreads ();

                    if (current_hostthreads !=  desired_hostthreads)
                        #
                        # Start by ordering up the right number
                        # of hostthread births or deaths:       

                        if (current_hostthreads < desired_hostthreads)
                            #
                            for  (i =  current_hostthreads;
                                  i <  desired_hostthreads;
                                ++i
                            ){
                                start_one_server_hostthread  per_who;                                           # 'per_who' just logs party responsible for starting up the hostthread.
                            };

                        else # current_hostthreads > desired_hostthreads

                            for  (i =  desired_hostthreads;
                                  i <  current_hostthreads;
                                ++i
                            ){
                                stop_one_server_hostthread { per_who, reply => (\\ _ = ()) };
                            };
                        fi;

                        # Finish up by waiting until actual number of
                        # hostthreads matches request.
                        #

                        # It would be nice to have a timeout here which
                        # logged an abort message and crashed out if things
                        # took too long, but that seems nontrivial with the
                        # current hostthread api, so we'll be less ambitious:
                        
                        hostthread::acquire_mutex  mutex;                                       # This mutex serializes access to  running_servers_count.
                            #
                            for (*running_servers_count != desired_hostthreads) {
                                #
                                hostthread::wait_on_condvar (condvar, mutex);           # This condvar will wake us each time  running_servers_count  changes.
                            };
                            #
                        hostthread::release_mutex  mutex;
                    fi;
                };
        end;


        fun is_doing_useful_work ()
            #
            # This is support for
            #
            #     no_runnable_threads_left__fate
            # from
            #    src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg
            #
            # which is tasked with exit()ing if the system is
            # deadlocked -- which is to say, no thread ready
            # to run and provably no prospect of ever having
            # a thread ready to run.
            #
            # If we have any hostthread currently processing a request
            # then it may in due course generate a reply waking up
            # a thread, so the system is not provably deadlocked and
            # no_runnable_threads_left__fate() should not exit:
            =
            {
                hostthread::acquire_mutex  mutex;
                    #
                    got_something_running      =  *running_thunks_count > 0;

                    external_queue_is_nonempty =   case *external_request_queue [] => FALSE;
                                                                                _  => TRUE;
                                                   esac;

                    internal_queue_is_nonempty =   case *internal_request_queue [] => FALSE;
                                                                                _  => TRUE;
                                                   esac;
                    #
                hostthread::release_mutex  mutex;

                doing_something =   got_something_running 
                                or  external_queue_is_nonempty
                                or  internal_queue_is_nonempty;

                doing_something;
            };
    };

end;

## Code by Jeff Prothero: Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext