PreviousUpNext

15.4.1094  src/lib/std/src/hostthread/io-wait-hostthread.pkg

# io-wait-hostthread.pkg
#
# For general background, see "Overview" comments in
#
#     src/lib/std/src/hostthread/io-wait-hostthread.api
#
# Our basic task here is to watch a set of file descriptors
# (typically pipes or sockets) and when an I/O opportunity
# arrives (almost always data now available to read) call
# a corresponding client-provided closure to take advantage
# of that opportunity.
#
# An I/O opportunity can be one of three things:
#
#   o Input file descriptor now has data available to be read. (The typical case of interest.)
#   o Output file descriptor can now accept a data write.      (Much rarer -- usually we just to a blocking write.) 
#   o Out-of-band-data is available to be read.                (Vanishingly rare in practice.)
#
# So our main datastructure requirement is to be able to map
#
#     (filedescriptor, action)
#
# pairs to corresponding client-provided closures ("functions").
#
# Since fds are short integers and actions can be represented
# in two bits, we fold fd+action into a single int which can
# then be used to index a stock red-black tree. 
#
#
# See also:
#
#     src/lib/std/src/hostthread/cpu-bound-task-hostthreads.pkg
#     src/lib/std/src/hostthread/io-bound-task-hostthreads.pkg

# Compiled by:
#     src/lib/std/standard.lib


stipulate
    package fil =  file__premicrothread;                                                        # file__premicrothread                  is from   src/lib/std/src/posix/file--premicrothread.pkg
    package hth =  hostthread;                                                                  # hostthread                            is from   src/lib/std/src/hostthread.pkg
    package im  =  int_red_black_map;                                                           # int_red_black_map                     is from   src/lib/src/int-red-black-map.pkg
    package psx =  posixlib;                                                                    # posixlib                              is from   src/lib/std/src/psx/posixlib.pkg
    package tim =  time;                                                                        # time                                  is from   src/lib/std/time.pkg
    package vu1 =  vector_of_one_byte_unts;                                                     # vector_of_one_byte_unts               is from   src/lib/std/src/vector-of-one-byte-unts.pkg
    package wio =  winix__premicrothread::io;                                                   # winix__premicrothread::io             is from   src/lib/std/src/posix/winix-io--premicrothread.pkg
    package wxp =  winix__premicrothread::process;                                              # winix__premicrothread::process        is from   src/lib/std/src/posix/winix-process--premicrothread.pkg
herein

    package   io_wait_hostthread
    : (weak)  Io_Wait_Hostthread                                                                # Io_Wait_Hostthread            is from   src/lib/std/src/hostthread/io-wait-hostthread.api
    {
        #################################################################
        # BEGIN type definitions section

        Pipe    =  { infd:   psx::File_Descriptor,
                     outfd:  psx::File_Descriptor
                   };

        # One record type for each request
        # supported by the server:
        #
        Do_Stop =  { per_who: String,   reply: Void   -> Void };                                # Edit to suit. The 'reply' thunks will typically send a do_something() request back to the originating hostthread.
        Do_Echo =  { what:    String,   reply: String -> Void };

        Do_Note_Iod_Reader
          =
          { io_descriptor:      wio::Iod,
            read_fn:            wio::Iod -> Void                                                # Call this fn (closure) on iod whenever input is available on fd.
          };

        Do_Note_Iod_Writer
          =
          { io_descriptor:      wio::Iod,
            write_fn:           wio::Iod -> Void                                                # Call this fn (closure) on iod whenever output is possible on fd.
          };

        Do_Note_Iod_Oobder
          =
          { io_descriptor:      wio::Iod,
            oobd_fn:            wio::Iod -> Void                                                # Call this fn (closure) on iod whenever out-of-band data ("oobd") is available on this fd.
          };

        Request =  DO_STOP              Do_Stop                                                 # Union of above record types, so that we can keep them all in one queue.
                |  DO_ECHO              Do_Echo
|  DO_TEST            String                                                  # Nominally temporary debug code;  'String' identifies caller for debug purposes. 
                ; 

        # END of type definitions section
        #################################################################



        #################################################################
        # BEGIN MUTABLE STATE SECTION
        #
        pid                          =   REF 0;                                                 # pid of current process while server is running, otherwise zero.
        server_hostthread_is_running =   REF FALSE;     

        client_fd_count              =   REF 0;                                                 # Number of file descriptors we are watching for clients.
                                                                                                # The "for clients" qualifier is because this count does not include the fd for our private pipe().
                                                                                                # This count is support for is_doing_useful_work ().

        client_fns
            =
            REF (im::empty:  im::Map( wio::Iod -> Void ));                                      # When an fd is ready to read, we'll look it up in this and call resulting read_fn on it. (Or similar for writing or out-of-band data.)
        

        private_pipe  =  REF  (NULL:  Null_Or(Pipe));                                           # This is a private pipe we use to wake our server hostthread by sending it a byte.
                                                                                                # Since it spends all of its time sleeping on a  wio::wait_for_io_opportunity (C select()/poll())
                                                                                                # call, sending it a byte is the simplest and best way to wake it. (See Note[1].)

        wait_requests =  REF ([]:  List( wio::Ioplea ));                                        # This is the set of file descriptors to watch for I/O opportunities via wio::wait_for_io_opportunity (C select()/poll()).
                                                                                                # INVARIANT: No two entries on list have same Iod_Descriptor value.

#       timeout       =  REF (tim::from_float_seconds 0.02);                                    # Set up to timeout at 50Hz.

# THIS VALUE IS ONLY A TEMPORARY DEBUG HACK:
 timeout              =  REF (tim::from_float_seconds 0.50);                                    # Set up to timeout at 50Hz.
# timeout             =  REF (tim::from_float_seconds 0.20);                                    # Set up to timeout at 50Hz.

                                                                                                # microthread_preemptive_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
        per_loop_fns  =  REF ([]: List( Ref(Void -> Void)) );                                   # Functions to call once each time around the main outer loop.
                                                                                                # See note_per_loop_fn/drop_per_loop_fn comments in     src/lib/std/src/hostthread/io-wait-hostthread.api

        request_queue =  REF ([]: List(Request));                                               # Queue of pending requests from client hostthreads.

        mutex   =  hth::make_mutex   ();                                                        # These will actually survive a heap save/load cycle, pretty much.
        condvar =  hth::make_condvar ();  

        # END OF MUTABLE STATE SECTION
        #################################################################




        fun print_wait_requests ({ io_descriptor, readable, writable, oobdable } ! rest)        # A little debug-support function, not used in in production code.
                =>
                {   fd =  psx::iod_to_fd (psx::fd_to_int  io_descriptor);
                    #
                    printf "fd %d readable %B writable %B oobdable %B\n"
                            fd
                            readable
                            writable
                            oobdable
                   ;
                };

            print_wait_requests [] =>   ();
        end;


        #################################################################
        # This section implements our tree mapping (iod+op) -> client_fn:

        # Values for our two-bit read/write/oobd field:                         # "oobd" == "out-of-band data".
        #
         read_op =  1;
        write_op =  2;
         oobd_op =  3;
        #
        fun fdop_to_index (fd: psx::File_Descriptor,  op: Int)
            =
            {   fd' =  psx::fd_to_int  fd;
                #
                index =  (fd' << 2) | op;
                #
                index: Int;
            };
        #
        fun index_to_fdop (index: Int)
            =
            {   op =  (index  & 3);
                fd =  (index >> 2);
                #
                (fd, op);
            };


        #
        fun is_running ()
            =
            {
                actual_pid =  wxp::get_process_id ();
                #
                hth::acquire_mutex  mutex;
                    #
                    #
                    if(*pid != 0   and   *pid != actual_pid)
                        pid := 0;
                        server_hostthread_is_running := FALSE;                  # To ensure that if the heap gets dumped to disk and then and reloaded, is_running() will (correctly) return FALSE.
                    fi;
                    #
                    result = *server_hostthread_is_running;
                    #
                hth::release_mutex  mutex;

                result;         
            };

        #
        fun request_queue_is_empty ()                                           # We cannot write just    fun request_queue_is_empty () =  (*request_queue == []);
            =                                                                   # because Request is not an equality type. (The 'reply' fields are functions
            case *request_queue    [] => TRUE;                                  # and Mythryl does not support comparison of thunks for equality.)
                                   _  => FALSE;
            esac;


        #
        fun default_wait_request_list (pipe: Pipe)
            =
            {   # Our minimal request list is to read
                # the pipe that clients use to wake us:
                #
                io_descriptor =  psx::fd_to_iod  pipe.infd;
                #
                [ { io_descriptor,
                    #
                    readable => TRUE,
                    writable => FALSE,
                    oobdable => FALSE
                  }
                ]; 
            };

        #
        fun get_timeout_interval ()
            =
            *timeout;                                                                                                                   # See comments in   src/lib/std/src/hostthread/io-wait-hostthread.api
        #
        fun set_timeout_interval  time                                                                                                  # Set timeout to use for our wio::wait_for_io_opportunity calls (C select()/poll().)
            =
            {
                hth::acquire_mutex  mutex;                                                                                              # Using the mutex here does have a point -- it
                    #                                                                                                                   # ensures that no mutex-locked critical section will
                    timeout :=  time;                                                                                                   # see the timeout changing unexpectedly out from under it.
                    #
                hth::release_mutex  mutex;
            };


        # For background on these two see comments in
        #
        #     src/lib/std/src/hostthread/io-wait-hostthread.api
        #
# Commented these two out because
# 1)  They call external fns while holding the mutex,
#     which isn't safe -- could threadswitch and deadlock
# 2)  Nobody was calling them anyhow.   -- 2012-11-11 CrT
#       fun note_per_loop_fn  loopfn                                                                                                    # Set fn to be called once each time around our outer loop.
#           =                                                                                                                           # This fn is used to drive pre-emptive multi-threading in the
#           {                                                                                                                           # threadkit scheduler:          src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
#               hth::acquire_mutex  mutex;
#                   #
#                   if (not (list::in (loopfn, *per_loop_fns)))                                                                         # Don't add fn to list if it is already in the list.
#                       #
#                       per_loop_fns :=  loopfn ! *per_loop_fns;
#                   fi;
#                   #
#               hth::release_mutex  mutex;
#           };
        #
#       fun drop_per_loop_fn  loopfn
#           =
#           {
#               hth::acquire_mutex  mutex;
#                   #
#                   per_loop_fns :=  list::drop (loopfn, *per_loop_fns);
#                   #
#               hth::release_mutex  mutex;
#           };


        #
        fun do_stop (r: Do_Stop)                                                                                                        # Shut down io-wait-hostthread server.
            =                                                                                                                           # Internal fn -- will execute in context of server hostthread.
            {
                hth::acquire_mutex  mutex;  
                    #
                    pid                          := 0;
                    server_hostthread_is_running :=  FALSE;
                    private_pipe                 :=  NULL;
                    #
                hth::release_mutex  mutex;  

                case *private_pipe                                                                                                      # Close pipe fds, so that we don't leak fds continually
                    #                                                                                                                   # if some client stops and starts us continually.
                    THE { infd, outfd }
                        =>                                                                                                              # NB: We avoid calling external fns while holding the mutex due to threat of threadswitch, re-entry and deadlock.
                        {
                            psx::close__without_syscall_redirection  infd;
                            psx::close__without_syscall_redirection  outfd;
                        };

                    NULL => ();
                esac;

                hth::broadcast_condvar condvar;                                                                                         # This will in particular wake up the loop in   stop_server_hostthread_if_running().

                r.reply ();
                #
                hth::hostthread_exit ();                
            };
        #
        fun do_echo (r: Do_Echo)                                                                                                        # Internal fn -- will execute in context of server hostthread.
            =
            r.reply  r.what;

        #
        fun write_to_private_pipe ()                                                                    # We do this to wake the main server hostthread from its I/O wait sleep.
            =
            {
                pipe =  the *private_pipe;
                #
                {   bytes_written                                                                       # Ignored.
                        =
                        psx::write_vector__without_syscall_redirection                                  # Write one byte into pipe.
                              (
                                pipe.outfd,
                                one_byte_slice_of_one_byte_unts
                              );
                }
                except
                    x = {
                        printf "iow::write_to_private_pipe/XXX EXCEPTION THROWN BY WRITE_VECTOR!\n";
                        (exception_name::exception_message x) -> msg;
                        printf "iow::write_to_private_pipe/XXXb: %s\n" msg;
                    };
            }
            where
                one_byte_slice_of_one_byte_unts                                                         # Just anything one byte long to write into our internal pipe.
                    =
                    vector_slice_of_one_byte_unts::make_full_slice                                      # vector_slice_of_one_byte_unts is from   src/lib/std/src/vector-slice-of-one-byte-unts.pkg
                        #
                        one_byte_vector_of_one_byte_unts
                        where
                            one_byte_vector_of_one_byte_unts
                                =
                                vu1::from_list
                                    #
                                    [ (one_byte_unt::from_int  0) ];
                        end;
            end;
        #
#       fun self_test caller_id                                                                         # Assumes private_pipe exists -- ie., that server_code() has been called.
#           =   
#           {
# printf "iow::self_test/TOP ...   (%s)\n" caller_id;
# printf "iow::self_test/AAA calling wait_for_io_opportunity...   (%s)\n" caller_id;
#               fds_ready_for_io
#                   =
#                   wio::wait_for_io_opportunity
#                     {
#                       wait_requests =>  *wait_requests,
#                       timeout       =>  THE *timeout
#                     };        
# printf "iow::self_test BBB: Back from waiting for io opportunity,   %d   fds_ready_for_io:   (%s)\n"  (list::length fds_ready_for_io)  caller_id;
# print_wait_requests  fds_ready_for_io;

# printf "iow::self_test CCC: Writing to pipe...   (%s)\n"  caller_id;
#           write_to_private_pipe ();
# printf "iow::self_test DDD: Back from writing to pipe...   (%s)\n" caller_id;

# printf "iow::self_test EEE: calling wait_for_io_opportunity...   (%s)\n" caller_id;
#               fds_ready_for_io
#                   =
#                   wio::wait_for_io_opportunity
#                     {
#                       wait_requests =>  *wait_requests,
#                       timeout       =>  THE *timeout
#                     };        
# printf "iow::self_test FFF: Back from waiting for io opportunity,   %d   fds_ready_for_io:    (%s)\n" (list::length fds_ready_for_io) caller_id;
# print_wait_requests  fds_ready_for_io;

# printf "iow::self_test GGG: Doing   apply  dummy_process_io_ready_fd   fds_ready_for_io   ...    (%s)\n" caller_id;
#               apply  dummy_process_io_ready_fd   fds_ready_for_io;                                            # Handle any new I/O opportunities.
# printf "iow::self_test HHH: Done    apply  dummy_process_io_ready_fd   fds_ready_for_io    (%s)\n" caller_id;
# printf "iow::self_test ZZZ   (%s)\n" caller_id;
#           }
#           where
#               fun dummy_process_io_ready_fd  { io_descriptor => iod, readable, writable, oobdable }
#                   =
#                   {
# printf "iow::dummy_process_io_ready_fd/TOP: Doing read_as_vector()    (%s)\n" caller_id;
#                      bytevector
#                          =    
#                          psx::read_as_vector__without_syscall_redirection                                                             # Read and discard the byte that was sent to us.
#                            {
#                              file_descriptor   =>  iod,
#                              max_bytes_to_read =>  1
#                            };

# printf "iow::dummy_process_io_ready_fd/AAA: Done read, (vu1::length bytevector) d=%d    (%s)\n" (vu1::length bytevector) caller_id;
#                      if ((vu1::length bytevector) == 0)                                               # We expect to see a 1-byte result.
#                          #
# printf "iow::dummy_process_io_ready_fd/BBB: ZERO LENGTH RESULT UNEXPECTED!    (%s)\n" caller_id;
#                          wxp::sleep 0.1;
# printf "iow::dummy_process_io_ready_fd/CCC: slept 0.1.    (%s)\n" caller_id;
#                      fi;
# printf "iow::dummy_process_io_ready_fd/ZZZ    (%s)\n" caller_id;
#                   };
#           end;
#       #
#       fun do_test (caller_id: String)                                                                                                 # Internal fn -- will execute in context of server hostthread.
#           =
#           {
# printf "iow::do_test/AAA   (%s)\n" caller_id;
#               hth::acquire_mutex mutex;  
# printf "iow::do_test/BBB   (%s)\n" caller_id;
#                   self_test   caller_id;
# printf "iow::do_test/CCC   (%s)\n" caller_id;
#                   self_test_complete := TRUE;
# printf "iow::do_test/DDD   (%s)\n" caller_id;
#                   hth::broadcast_condvar condvar;                                                                                     # This will in particular wake up the loop in   stop_server_hostthread_if_running().
# printf "iow::do_test/EEE   (%s)\n" caller_id;
#               hth::release_mutex mutex;  
# printf "iow::do_test/ZZZ   (%s)\n" caller_id;
#           };

        stipulate
            # A helper fn shared by
            # drop_iod_reader and       
            # drop_iod_writer and
            # drop_iod_oobder:
            #
            fun drop_fn (old_tree,  index)                                                                                              # Drop fn with given index from our red-black tree.
                    =
                    im::drop (old_tree, index);
        herein
            # For the next six fns there is no obvious
            # reason to do the work in the server hostthread,
            # so we go ahead and do it in the context of
            # the client hostthread:
            #
            fun note_iod_reader { io_descriptor, read_fn }                                                                              # Start watching for opportunities to read given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  io_descriptor),  read_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #                       
                        client_fns      :=   im::set( *client_fns, index, read_fn );
                        #                       
                        wait_requests   :=   note_read_request (*wait_requests, io_descriptor);
                        #
                        client_fd_count :=  *client_fd_count  + 1;
                        #
                    hth::release_mutex  mutex;
                }
                where
                    fun note_read_request ([], iod)                                                                                     # Drop 'write' request for given iod.
                            =>
                            [ { io_descriptor,  readable => TRUE,  writable => FALSE,  oobdable => FALSE } ];

                        note_read_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable => TRUE, writable, oobdable } ! rest;                 # Previous op request(s) for that iod, so just 'write' to them.
                            else                        req   !   note_read_request (rest, iod);
                            fi; 
                    end;
                end;

            #
            fun drop_iod_reader  (iod:  wio::Iod)                                                                               # Stop watching for opportunities to read given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  iod),  read_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #                               
                        client_fns      :=   drop_fn (*client_fns, index);
                        #                               
                        wait_requests   :=   drop_read_request (*wait_requests, iod);
                        #                       
                        client_fd_count :=  *client_fd_count  - 1;
                        #                       
                    hth::release_mutex  mutex;
                }
                where
                    fun drop_read_request ([], iod)                                                                                     # Drop read-request for given iod from requests-list.
                            =>
                            [];

                        drop_read_request   ( ( (req as { io_descriptor, readable =>  TRUE,
                                                                         writable =>  FALSE,
                                                                         oobdable =>  FALSE
                                                        }
                                                )
                                                !
                                                rest
                                              ),
                                              iod
                                            )
                            =>
                            if (io_descriptor == iod)   rest;                                                                           # Only request for that iod is 'read', so drop it completely.
                            else                        req   !   drop_read_request (rest, iod);
                            fi; 

                        drop_read_request (((req as { io_descriptor, readable => TRUE, writable, oobdable }) ! rest), iod)              # Multiple op requests fro that iod, so drop only the 'read' one.
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable => FALSE, writable, oobdable }  !  rest;
                            else                        req   !   drop_read_request (rest, iod);
                            fi; 

                        drop_read_request  (req ! rest,  iod)
                            =>
                            req   !   drop_read_request (rest, iod);
                    end;
                end;



            #
            fun note_iod_writer { io_descriptor, write_fn }                                                                             # Start watching for opportunities to write given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  io_descriptor),  write_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #
                        client_fns      :=   im::set( *client_fns, index, write_fn );
                        #
                        wait_requests   :=   note_write_request (*wait_requests, io_descriptor);
                        #
                        client_fd_count :=  *client_fd_count  + 1;
                        #
                    hth::release_mutex mutex;
                }
                where
                    fun note_write_request ([], iod)                                                                                    # Add 'write' request for given iod.
                            =>
                            [ { io_descriptor,  readable => FALSE,  writable => TRUE,  oobdable => FALSE } ];

                        note_write_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable, writable => TRUE, oobdable } ! rest;                 # Previous op request(s) for that iod, so just add 'write' to them.
                            else                        req   !   note_write_request (rest, iod);
                            fi; 
                    end;
                end;

            #
            fun drop_iod_writer  (iod:  wio::Iod)                                                                               # Stop watching for opportunities to write given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  iod),  write_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #
                        client_fns      :=   drop_fn (*client_fns, index);
                        #
                        wait_requests   :=   drop_write_request (*wait_requests, iod);
                        #
                        client_fd_count :=  *client_fd_count  - 1;
                        #
                    hth::release_mutex  mutex;
                }
                where
                    fun drop_write_request ([], iod)                                                                                    # Drop 'write' request for given iod.
                             =>
                            [];

                        drop_write_request  ( ( (req as { io_descriptor, readable =>  FALSE,
                                                                         writable =>  TRUE,
                                                                         oobdable =>  FALSE
                                                        }
                                                )
                                                !
                                                rest
                                              ),
                                              iod
                                            )
                            =>
                            if (io_descriptor == iod)   rest;
                            else                        req   !   drop_write_request (rest, iod);                                       # Only request for that iod is 'read', so drop it completely.
                            fi; 
                            #

                        drop_write_request (((req as { io_descriptor, readable, writable => TRUE, oobdable }) ! rest), iod)
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable, writable => FALSE, oobdable } ! rest;                # Multiple op requests for that iod, so drop only the 'write' one.
                            else                        req   !   drop_write_request (rest, iod);
                            fi; 

                        drop_write_request  (req ! rest,  iod)
                            =>
                            req   !   drop_write_request (rest, iod);
                    end;
                end;



            #
            fun note_iod_oobder { io_descriptor, oobd_fn }                                                                              # Start watching for opportunities to read out-of-band data from given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  io_descriptor),  oobd_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #
                        client_fns      :=   im::set( *client_fns, index, oobd_fn );
                        #
                        wait_requests   :=   note_oobd_request (*wait_requests, io_descriptor);
                        #
                        client_fd_count :=  *client_fd_count  + 1;
                        #
                    hth::release_mutex mutex;
                }
                where
                    fun note_oobd_request ([], iod)                                                                                     # Add 'oobd' request for given iod.
                            =>
                            [ { io_descriptor,  readable => FALSE,  writable => TRUE,  oobdable => FALSE } ];

                        note_oobd_request (((req as { io_descriptor, readable, writable, oobdable }) ! rest), iod)
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable, writable, oobdable => TRUE } ! rest;                 # Previous op request(s) for that iod, so just add 'oobd' to them.
                            else                        req   !   note_oobd_request (rest, iod);
                            fi; 
                    end;
                end;

            #
            fun drop_iod_oobder  (iod:  wio::Iod)                                                                               # Stop watching for opportunities to read out-of-band data from given iod.
                =
                {   index =  fdop_to_index(  (psx::iod_to_fd  iod),  oobd_op  );
                    #
                    hth::acquire_mutex  mutex;
                        #
                        client_fns      :=   drop_fn (*client_fns, index);
                        #
                        wait_requests   :=   drop_oobd_request (*wait_requests, iod);
                        #
                        client_fd_count :=  *client_fd_count  - 1;
                        #
                    hth::release_mutex  mutex;
                }
                where
                    fun drop_oobd_request ([], iod)                                                                                     # Drop 'oobd' request for given iod.
                             =>
                            [];

                        drop_oobd_request   ( ( (req as { io_descriptor, readable =>  FALSE,
                                                                         writable =>  FALSE,
                                                                         oobdable =>  TRUE
                                                        }
                                                )
                                                !
                                                rest
                                              ),
                                              iod
                                            )
                            =>
                            if (io_descriptor == iod)   rest;
                            else                        req   !   drop_oobd_request (rest, iod);                                        # Only request for that iod is 'oobd', so drop it completely.
                            fi; 
                            #

                        drop_oobd_request (((req as { io_descriptor, readable, writable, oobdable => TRUE }) ! rest), iod)
                            =>
                            if (io_descriptor == iod)   { io_descriptor, readable, writable, oobdable => FALSE } ! rest;                # Multiple op requests for that iod, so drop only the 'oobd' one.
                            else                        req   !   drop_oobd_request (rest, iod);
                            fi; 

                        drop_oobd_request  (req ! rest,  iod)
                            =>
                            req   !   drop_oobd_request (rest, iod);
                    end;
                end;
        end;




        #
        fun stop_server_hostthread_if_running  (request: Do_Stop)                               # Queue up a stop-server-hostthread request for later execution by do_stop.
            =                                                                                   # External fn -- will execute in context of client hostthread.
            { 
                if (*server_hostthread_is_running)
                    #
                    hth::acquire_mutex  mutex;  
                        #
                        request_queue :=  (DO_STOP request)  !  *request_queue; 
                        #
                    hth::release_mutex mutex;  
                    # 
                    write_to_private_pipe();                                                    # Wake up main server hostthread to process request.
                                                                                                # DO NOT DO THIS WHILE HOLDING THE MUTEX!

                    hth::acquire_mutex  mutex;  
                        #
                        for (*server_hostthread_is_running) {
                            #
                            hth::wait_on_condvar (condvar, mutex);                              # This condvar will wake us each time  running_servers_count  changes.
                        };
                        #
                    hth::release_mutex mutex;  
                fi;
            };           
        #
        fun echo  (request: Do_Echo)                                                            # Queue up an echo-back-toclient request for later execution by do_echo.  (This is mostly for unit testing.)
            =                                                                                   # External fn -- will execute in context of client hostthread.
            { 
                hth::acquire_mutex mutex;  
                    # 
                    request_queue :=  (DO_ECHO request)  !  *request_queue; 
                    # 
                hth::release_mutex mutex;  
                # 
                write_to_private_pipe();                                                        # Wake up main server hostthread to process request.
            };           

        #
#       fun test  (caller_id: String)                                                           # WE ASSUME ONLY ONE CALLER AT A TIME, SO WE DON'T WORRY ABOUT MUTUAL EXCLUSION.
#           =                                                                                   # External fn -- will execute in context of client hostthread.
#           { 
# printf "iow::test/AAA acquiring mutex...    (%s)\n" caller_id;
#               hth::acquire_mutex  mutex;  
#                   # 
# printf "iow::test/BBB acquired mutex...   (%s)\n" caller_id;
#                   self_test_complete := FALSE;

# printf "iow::test CCC   (%s)\n" caller_id;
#                   request_queue :=  (DO_TEST caller_id)  !  *request_queue; 
#                       # 
#               hth::release_mutex mutex;  
# printf "iow::test DDD   (%s)\n" caller_id;
#                   write_to_private_pipe();                                                    # Wake up main server hostthread to process request.
#                                                                                               # DO NOT DO THIS WHILE HOLDING THE MUTEX!
#               hth::acquire_mutex  mutex;  

# printf "iow::test EEE   (%s)\n" caller_id;
#                   for (not *self_test_complete) {
#                       #
# printf "iow::test FFF   (%s)\n" caller_id;
#                       hth::wait_on_condvar (condvar, mutex);                                  # This condvar will wake us each time  running_servers_count  changes.
#                   };

#                   # 
# printf "iow::test/GGG releasing mutex...  (%s)\n" caller_id;
#               hth::release_mutex mutex;  
# printf "iow::test/ZZZ released mutex.   (%s)\n"  caller_id;
#           };           
        #
        fun get_new_requests  () 
            = 
            { 
                hth::acquire_mutex mutex;  
                    # 
                    new_requests  = *request_queue;
                    # 
                    request_queue := []; 
                    # 
                hth::release_mutex  mutex;  
                # 
                reverse  new_requests;                                                          # 'reverse' to restore original request ordering.
            };           

        #
        fun server_code ()                                                                      # Our server hostthread begins execution here.
            = 
            {
                hth::set_hostthread_name "io wait";

                hth::acquire_mutex  mutex;
                    #
                    server_hostthread_is_running :=  TRUE;

                    private_pipe  :=  THE (psx::make_pipe__without_syscall_redirection ());     # We do not close any existing pipe fds here because they might be stale stuff from before a heap dump/load cycle, in which
                                                                                                # case closing them might close something we actually want this time around -- that would produce very mysterious bugs!

                    wait_requests :=  default_wait_request_list (the *private_pipe);            # By default we listen only on our private pipe.

                    client_fd_count := 0;                                                       # As yet we are watching no fds for clients. 

#                   timeout       :=  tim::from_float_seconds  0.02;                            # Start up with timeout frequency set to 50Hz.
# THIS VALUE IS ONLY A TEMPORARY DEBUG HACK:
 timeout          :=  tim::from_float_seconds  0.5;
                    hth::broadcast_condvar condvar;                                             # This will in particular wake up the loop in   start_server_hostthread_if_not_running().
                    #
                hth::release_mutex  mutex;
                #
                server_loop (); 
            } 
            where 
                fun service_request (DO_STOP r) =>  do_stop r; 
                    service_request (DO_ECHO r) =>  do_echo r;
#                   service_request (DO_TEST r) =>  do_test r;
                end; 
                #
                fun process_io_ready_fd  { io_descriptor => iod, readable, writable, oobdable }
                    =
                    {
                        pipe =  the *private_pipe;
                        #
                        if (iod  !=  psx::fd_to_iod  pipe.infd)
                            #
                            # Normal case:  An I/O opportunity has appeared
                            # on a client-specified fd, so call the corresponding
                            # client-supplied handler fn:
                            #
                            if readable         index = fdop_to_index ((psx::iod_to_fd  iod),  read_op);        client_fn =  the (im::get( *client_fns, index ));   client_fn iod;      fi;
                            if writable         index = fdop_to_index ((psx::iod_to_fd  iod), write_op);        client_fn =  the (im::get( *client_fns, index ));   client_fn iod;      fi;
                            if oobdable         index = fdop_to_index ((psx::iod_to_fd  iod),  oobd_op);        client_fn =  the (im::get( *client_fns, index ));   client_fn iod;      fi;
                        else
                            # Special case:  A byte has been sent to us on our
                            # private pipe to wake us from our normal I/O wait.
                            # It means that request_queue holds client-hostthread
                            # request(s) for us to process:
                            #
                            bytevector
                                =       
                                psx::read_as_vector__without_syscall_redirection                        # Read and discard the byte that was sent to us.
                                  {
                                    file_descriptor   =>  pipe.infd,
                                    max_bytes_to_read =>  1
                                  };

                            # Sanity check:
                            #   
                            if ((vu1::length bytevector) == 0)                                          # We expect to see a 1-byte result.
                                #                                                                       # (We might possibly see more than one byte if multiple requests have been submitted since we last woke up.)
                                #
                                printf "iow::process_io_ready_fd/OOO: error: Exiting due unexpected EOF on private pipe.    -- src/lib/std/src/hostthread/io-wait-hostthread.pkg\n";
                                wxp::exit_uncleanly 1;
                            fi;

                            ();
                        fi;
                    };
                #
                fun server_loop ()                                                                      # This is the outer loop for the io-wait server hostthread.
                    = 
                    {
                        {
                            fds_ready_for_io
                                =
                                wio::wait_for_io_opportunity__without_syscall_redirection               # We *are* a secondary hostthread, so it makes no sense to redirect this call via a(nother) secondary hostthread.
                                  {                                                                     # Also, it wouldn't work because we don't have the infrastructure to harvest the responses to redirected calls.
                                    wait_requests =>  *wait_requests,
                                    timeout           =>  THE *timeout
                                  };

                            apply  process_io_ready_fd   fds_ready_for_io;                              # Handle any new I/O opportunities.

                            apply  service_request  (get_new_requests());                               # Handle any new requests from client hostthreads.

                            apply  (\\ f = *f())   *per_loop_fns;                                       # Give the threadkit scheduler an irregular "clock" pulse to drive preemptive timeslicing.
                            #
                        } except x = {                                                                  # NB: Placing this 'except' clause at Position P would result in a memory leak, as of 2013-04-06 at least. 
                            printf "error: iow::server_loop: Exception!\n";
                            printf "error: iow::server_loop/exception name s='%s'\n" (exceptions::exception_name    x);
                            printf "error: iow::server_loop/exception msg  s='%s'\n" (exceptions::exception_message x);
                            raise exception x;                                                          # Should probably shut down hard and sudden here. XXX SUCKO FIXME.
                        };

                        server_loop (); 
                    };                                                                                  # Position P.
            end;


        #
        fun start_server_hostthread_if_not_running  per_who                                             # 'per_who' is a string identifying the client requesting the startup, for logging purposes.
            =
            {
                actual_pid = wxp::get_process_id ();

                hth::acquire_mutex mutex;  
                    #
                    if(*pid != 0   and          *pid != actual_pid)
                        pid                          :=  actual_pid;
                        server_hostthread_is_running :=  FALSE;                                         # Ensure that if the heap gets dumped to disk and then and reloaded, *server_hostthread_is_running will still be correct.
                    fi;
                    #
                hth::release_mutex  mutex;  
                #
                if (not *server_hostthread_is_running)
                    #
                    hth::acquire_mutex mutex;  
                        #
                        hth::spawn_hostthread  server_code;
                        #
                        for (not *server_hostthread_is_running) {
                            #
                            hth::wait_on_condvar (condvar, mutex);                                      # This condvar will wake us each time  running_servers_count  changes.
                        };
                    hth::release_mutex  mutex;  
                fi;
            };

        fun is_doing_useful_work ()
            #
            # This is support for
            #
            #     no_runnable_threads_left__fate
            # from
            #     src/lib/src/lib/thread-kit/src/glue/threadkit-base-for-os-g.pkg
            #
            # which is tasked with exit()ing if the system is
            # deadlocked -- which is to say, no thread ready
            # to run and provably no prospect of ever having
            # a thread ready to run.
            #
            # If we are listening on any fd, then we can eventually
            # wake up some thread when input arrives on that fd, so
            # the system is not truly deadlocked and no_runnable_threads_left__fate()
            # should not exit:
            =
            if (not (is_running ()))    FALSE;                                                          # If we're not running, we're not doing useful work. :-)
            else
                hth::acquire_mutex  mutex;
                    #
                    got_client_fds =   *client_fd_count > 0;                                            # If we are running, we're doing useful work if-and-only-if we are watching at least one fd for clients.

                    got_requests   =    case *request_queue     [] => FALSE;
                                                                _  => TRUE;
                                        esac;
                    #
                hth::release_mutex  mutex;  

                got_client_fds or got_requests;
            fi; 
    };
end;


#######################################################################
# Note[1]
# Most hostthread servers block on their request_queue when not busy,
# and thus can be woken by a simple hth::broadcast_condvar, but
# here we will be spending almost all our time blocked in a select(),
# so that will not work.
#
# Consequently we use a dedicated pipe to wake our server hostthread.
# By always including a read of this pipe in our wio::wait_for_io_opportunity
# call (== C select()/poll()) we ensure that the server hostthread can
# always we woken just by writing one byte to the pipe.
#
# NB: Modern Linux/Posix provide pselect()/ppoll() calls which can wait
#     on signals as well as fds, so an alternative would be to use them
#     and then wake-via-signal.  But signals are public and finite in number
#     whereas pipes are private and unbounded in number, so the pipe
#     solution still seems cleaner, and efficiency is a non-issue in
#     this application (we do not expect to use this mechanism heavily).



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext