PreviousUpNext

15.4.1156  src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg

## cpu-bound-task-pthreads.pkg
#
# Server pthreads to offload cpu-intensive computations
# from the main threadkit pthread.
#
# See also:
#
#     src/lib/std/src/pthread/io-bound-task-pthreads.pkg
#     src/lib/std/src/pthread/io-wait-pthread.pkg

# Compiled by:
#     src/lib/std/standard.lib


stipulate
    package fil =  file;                                                        # file                          is from   src/lib/std/src/posix/file.pkg
    package pth =  pthread;                                                     # pthread                       is from   src/lib/std/src/pthread.pkg
    package wxp =  winix::process;                                              # winix::process                is from   src/lib/std/src/posix/winix-process.pkg
herein

    package cpu_bound_task_pthreads
    :       Cpu_Bound_Task_Pthreads                                             # Cpu_Bound_Task_Pthreads       is from   src/lib/std/src/pthread/cpu-bound-task-pthreads.api
    { 
        # One record for each request
        # supported by the server:
        #
        Do_Stop =  { who:  String,      reply: Void   -> Void };
        Do_Echo =  { what: String,      reply: String -> Void };

        Request =  DO_STOP  Do_Stop                                             # Union of above record types, so that we can keep them all in one queue.
                |  DO_ECHO  Do_Echo
                |  DO_TASK  (Void -> Void)
                ; 

        mutex   =  pth::make_mutex   (); 
        condvar =  pth::make_condvar ();  

        pid                   =  REF 0; 
        running_servers_count =  REF 0;                                         # Count of servers running.  Typically as many as cores on the machine, or maybe one less.

        external_request_queue =  REF ([]: List(Request));
        internal_request_queue =  REF ([]: List(Request));
            #
            # We need two queues because clients will prepend
            # requests to the external queue, leaving it in
            # reverse order, but we want to run tasks in
            # submission order.  So periodically when the
            # internal queue is empty we set it to the
            # reversed contents of the external queue.

        fun get_count_of_running_pthreads ()
            =
            {
                #
                actual_pid = wxp::get_process_id ();                            # If the heap gets dumped to disk and then and reloaded, running_servers_count will be bogus.
                #                                                               # We detect this by checking if the pid has changed.  There is of course a small chance
                if(*pid != actual_pid)                                          # that by accident we still have the same pid after a save/reload, in which case we lose.       XXX BUGGO FIXME.
                    pid := actual_pid;                                          # A fix might be to have a generation number associated with each heap image which gets
                    #                                                           # incremented on every save/load cycle.
                    running_servers_count :=  0;
                fi;

                *running_servers_count;
            };


        fun external_request_queue_is_empty ()                                  # We cannot write just    fun request_queue_is_empty () =  (*request_queue == []);
            =                                                                   # because Request is not an equality type. (The 'reply' fields are functions
            case *external_request_queue    [] => TRUE;                         # and Mythryl does not support comparison of functions for equality.)
                                            _  => FALSE;
            esac;

        fun internal_request_queue_is_empty ()                                  # We cannot write just    fun request_queue_is_empty () =  (*request_queue == []);
            =                                                                   # because Request is not an equality type. (The 'reply' fields are functions
            case *internal_request_queue    [] => TRUE;                         # and Mythryl does not support comparison of functions for equality.)
                                            _  => FALSE;
            esac;



        fun do_stop (r: Do_Stop)                                                # Internal fn -- will execute in context of server pthread.
            =
            {   r.reply ();
                #
                fil::log  .{ "src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg: Shutting down per request from '" + r.who + "'."; };
                #
                running_servers_count
                   :=
                   get_count_of_running_pthreads () - 1; 
                #
                pthread::pthread_exit ();               
            };


        fun do_echo (r: Do_Echo)                                                # Internal fn -- will execute in context of server pthread.
            =
            r.reply  r.what;


        fun do_task (task: Void -> Void)                                        # Internal fn -- will execute in context of server pthread.
            =
            task ();



        ###############################################
        # The rest of the file is mostly boilerplate:
        ###############################################

        fun stop  (request: Do_Stop)                                            # External fn -- will execute in context of client pthread.
            = 
            { 
                pth::acquire_mutex mutex;  
                    # 
                    external_request_queue :=  (DO_STOP request)  !  *external_request_queue; 
                    # 
                    pth::broadcast_condvar condvar;  
                    # 
                pth::release_mutex mutex;  
            };           

        fun echo  (request: Do_Echo)                                            # External fn -- will execute in context of client pthread.
            = 
            { 
                pth::acquire_mutex mutex;  
                    # 
                    external_request_queue :=  (DO_ECHO request)  !  *external_request_queue; 
                    # 
                    pth::broadcast_condvar condvar;  
                    # 
                pth::release_mutex mutex;  
            };           

        fun do    (task: Void -> Void)                                          # External fn -- will execute in context of client pthread.
            = 
            { 
                pth::acquire_mutex mutex;  
                    # 
                    external_request_queue :=  (DO_TASK task)  !  *external_request_queue; 
                    # 
                    pth::broadcast_condvar condvar;  
                    # 
                pth::release_mutex mutex;  
            };           


        fun get_next_request  () 
            = 
            { 
                pth::acquire_mutex mutex;  
                # 
                for (external_request_queue_is_empty ()
                and  internal_request_queue_is_empty ()
                ){
                    #
                    pth::wait_on_condvar (condvar, mutex);
                };

                case *internal_request_queue
                    #
                    (task ! rest)
                        =>
                        {   internal_request_queue :=   rest;
                            #
                            pth::release_mutex  mutex;  
                            #
                            task;
                        };

                    [] =>
                        case *external_request_queue
                            #
                            (task ! rest)
                                =>
                                {   internal_request_queue :=   reverse  rest;  # Refill internal queue from external one, reversing to restore original request ordering.
                                    external_request_queue :=   [];
                                    #
                                    pth::release_mutex  mutex;  
                                    #
                                    task;
                                };

                            [] => raise exception FAIL "impossible";            # The above 'for' loop condition guarantees one of the two queues is nonempty.
                        esac;
                esac;
            };           

        fun server_loop ()                                                      # This is the outer loop for each cycleserver pthread.
            = 
            {   service_request (get_next_request()); 
                #
                server_loop (); 
            } 
            where 
                fun service_request (DO_STOP r) =>  do_stop r; 
                    service_request (DO_ECHO r) =>  do_echo r;
                    service_request (DO_TASK r) =>  do_task r;
                end; 
            end; 

        fun start who
            =
            {   pth::acquire_mutex mutex;  
                #
                fil::log  .{ "src/lib/std/src/pthread/cpu-bound-task-pthreads.pkg: Starting up server loop in response to '" + who + "'."; };

                pid :=  wxp::get_process_id ();

                running_servers_count
                   :=
                   get_count_of_running_pthreads () + 1; 

                pth::release_mutex  mutex;  
                #
                pth::spawn_pthread  server_loop;

                *running_servers_count;
            };

    };

end;

## Code by Jeff Prothero: Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext