PreviousUpNext

15.4.921  src/lib/src/lib/thread-kit/src/core-thread-kit/io-now-possible-mailop.pkg

## io-now-possible-mailop.pkg
#
# See comments in   src/lib/src/lib/thread-kit/src/core-thread-kit/io-now-possible-mailop.api

# Compiled by:
#     src/lib/std/standard.lib


stipulate
    package fat =  fate;                                                        # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                    # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package tim =  time;                                                        # time                                  is from   src/lib/std/time.pkg
    package mps =  microthread_preemptive_scheduler;                            # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    package wnx =  winix__premicrothread;                                       # winix__premicrothread                 is from   src/lib/std/winix--premicrothread.pkg
    package wio =  winix__premicrothread::io;                                                   # winix_io__premicrothread              is from   src/lib/std/src/posix/winix-io--premicrothread.pkg
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein
    # This package gets referenced in:
    #
    #     src/lib/std/src/threadkit/posix/winix-io.pkg
    #     src/lib/std/src/socket/proto-socket.pkg
    #     src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg
    #     src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg
    #
    package  io_now_possible_mailop
    : (weak) Io_Now_Possible_Mailop                                             # Io_Now_Possible_Mailop        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/io-now-possible-mailop.api
    {
        Ioplea_Item
            =
            { ioplea:           wio::Ioplea,                                    # Ioplea =  { io_descriptor: Iod, readable: Bool, writable: Bool, oobdable: Bool };
              do1mailoprun_status:      Ref( itt::Do1mailoprun_Status ),        # Do1mailoprun_Status = DO1MAILOPRUN_IS_COMPLETE | DO1MAILOPRUN_IS_BLOCKED  Thread; do1mailoprun_status is a mutex enforcing the one-mailop-fires-per-select() rule.
              #
              finish_do1mailoprun:      Void -> Void,                           # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
              fate:             fat::Fate( wio::Ioplea_Result )
            };

        waiting_queue__local =   REF ([]: List( Ioplea_Item ));                 # Icky thread-hostile mutable global state...? XXX SUCKO FIXME

        #
        fun check_for_io_opportunities  wait_requests
            =
            wio::wait_for_io_opportunity                                        # In some OSs (e.g., Linux) this may raise an EINTR error even though it is non-blocking.
              {
                wait_requests,
                timeout => THE tim::zero_time
              }
            except
                _ = [];

                                                                                # NOTE:  As in the case of condition variables -- see 
                                                                                #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                                                                                # -- we need to do the finish_do1mailoprun routine when we enable the
                                                                                # io_mailop (instead of in the mailop-now-ready-to-fire fate).
        #
        fun io_now_possible_on'  ioplea                                         # This fn is called (only) from   src/lib/std/src/threadkit/posix/winix-io.pkg
            =                                                                   #                                 src/lib/std/src/socket/proto-socket.pkg
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]                         #                                 src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg
            where

                fun suspend_then_eventually_fire_mailop                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop   # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    an_ioplea                                                   
                    where
                        (call_with_current_fate
                            (
                             \\ fate
                                =
                                {   item =  { ioplea, do1mailoprun_status, finish_do1mailoprun, fate };
                                    #
                                    waiting_queue__local :=  item  !  *waiting_queue__local;
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # Return control to mailop.pkg.
                                    #
                                                                                                                raise exception DIE "io_mailop' -- return_to__suspend_then_eventually_fire_mailops__loop returned";
                                                                                                                # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            -> an_ioplea;                                                                       # Execution will pick up on this like when 'fate' is eventually invoked.
                                                                                                                # This will happen below in   add_any_new_fd_io_opportunities_to_run_queue__iu()  after a C-level
                    end;                                                                                        # select()/poll() indicates that fd read/write is possible (i.e., data in input buffer or space in output buffer).

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' calls this 'pollFn'.
                    =                                                                                           # If it returns READY_MAILOP   then the mailop is     a candidate to fire in the do_one_mailop [] call.
                    case (check_for_io_opportunities  [ioplea])                                                 # If it returns UNREADY_MAILOP then the mailop is not a candidate to fire in the do_one_mailop [] call.
                        #
                        [an_ioplea]
                            =>
                                itt::READY_MAILOP
                                  {
                                    fire_mailop => {.   log::uninterruptible_scope_mutex := 0;                  # Reppy refers to 'fire_mailop' as 'doFn'.
                                                        an_ioplea;
                                                    }
                                  };

                        _   =>  itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                    esac;
            end;


        fun same_ioplea (an_ioplea, ioplea)                                                                     # This is an internal fn, not exported to clients.
            =
            an_ioplea == ioplea;


        fun drop_cancelled_mailops  wait_queue
            =
            # We return the thinned I/O waiting queue
            # along with the list of wait_requests in it.
            #
            # NB: Both will be in reverse order relative
            # to wait_queue arg:
            #
            drop_cancelled_mailops' (wait_queue, [], [])
            where
                fun drop_cancelled_mailops' ([] : List( Ioplea_Item ),   wait_requests,   wait_queue)
                        =>
                        (wait_requests, wait_queue);                                                                                                    # Done.

                    drop_cancelled_mailops' ({ do1mailoprun_status => REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest,   wait_requests,   wait_queue)
                        =>
                        drop_cancelled_mailops' (rest, wait_requests, wait_queue);                                                                      # Drop completed/cancelled mailop.

                    drop_cancelled_mailops' ((item as { ioplea, ... } ) ! rest,   wait_requests,   wait_queue)
                        =>
                        drop_cancelled_mailops'                                                                                                         # Pass everything else through.
                          ( rest,
                            ioplea  !  wait_requests,
                            item          !  wait_queue
                          );
                end;
            end;


        fun push_io_onto_run_queue                                                                      # Private to this file.
                (
                  { do1mailoprun_status  as  REF (itt::DO1MAILOPRUN_IS_BLOCKED thread),                 # The wait_queue entry corresponding to 'an_ioplea'.
                    finish_do1mailoprun,                                                                        # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                    fate,
                    ioplea
                  },
                  an_ioplea                                                                             # The I/O opportunity now open.
                )
                =>
                # Enqueue a thread that is polling on the ready queue.
                #
                # We need two call_with_current_fate calls here:
                #
                #  o One to capture current fate, so we can continue
                #    with it when we are done.
                #
                #  o One to construct the fate we are going to enter
                #    into the run queue.  This mainly consists of
                #    combining our 'fate' arg with our 'an_ioplea' arg to
                #    produce a new fate implicitly wrapping both.
                #
                # We also must catch the case where the mailop
                # has been canceled, since a single thread might be
                # polling on multiple descriptors.
                #
                {   (call_with_current_fate
                        (
                         \\ main_fate                                                                   # 'main_fate' represents the vanilla computation starting below with   -> fate_ioplea;
                            =
                            {   call_with_current_fate
                                    (\\ fate_ioplea                                                     # 'fate_ioplea' represents the computation  'switch_to_fate fate ioplea'.
                                        =                                                               # (Since switch_to_fate never returns, that is the whole of 'fate_ioplea'.)
                                        switch_to_fate  main_fate  fate_ioplea);                        # This arranges for the main fate to execute as expected.  Without this line
                                #                                                                       # we'd immediately run off and do  'switch_to_fate fate ioplea' and the do1mailoprun_status := ... ; ... stuff would never execute.
                                switch_to_fate  fate  an_ioplea;
                            }
                        )
                    )
                        -> fate_ioplea;                                                                 # Execution picks up here when someone eventually calls main_fate with an argument.
                                                                                                        
                                                                                                        # 'fate_ioplea' is essentially 'fate ioplea'.  It has type  Fate(Void).
                    do1mailoprun_status :=  itt::DO1MAILOPRUN_IS_COMPLETE;

                    finish_do1mailoprun ();

                    mps::push_into_run_queue (thread, fate_ioplea);
                };

            push_io_onto_run_queue ( { do1mailoprun_status => REF itt::DO1MAILOPRUN_IS_COMPLETE, ... }, _)
                =>
                ();
        end;


        fun add_any_new_fd_io_opportunities_to_run_queue__iu ()                                         # This is an external entrypoint into this file.
            =                                                                                           # This fn is called (only) from:        src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg
            case (drop_cancelled_mailops  *waiting_queue__local)
                #         
                ([], _) =>    waiting_queue__local :=   [];
                #
                (wait_requests, wait_queue)                                                             # NB: wait_requests and wait_queue are both reverse-order relative to  *waiting_queue__local.
                    =>
                    case (check_for_io_opportunities  wait_requests)
                        #
                        [] =>  waiting_queue__local :=   reverse  wait_queue;                           # The 'reverse' restores original *waiting_queue__local ordering.
                        #
                        l  => filter (l, wait_queue, [])
                              where
                                  fun filter ([], r, result_wait_queue)
                                          =>
                                          waiting_queue__local
                                              :=
                                              list::reverse_and_prepend (r, result_wait_queue);         # Reverse 'r' and prepend it to (already-re-reversed) result_wait_queue.

                                      filter
                                          ( an_ioplea ! iopleas,                                        # (Remaining) list of I/O opportunities identified by check_for_io_opportunities.
                                            (item:  Ioplea_Item) ! items,                               # (Remaining) reversed wait_queue.
                                            result_wait_queue                                           # 
                                          )
                                          =>
                                          if (same_ioplea (an_ioplea, item.ioplea))                     # We're searching down our wait_queue for the item matching 'an_ioplea'.
                                              #                                                         # Aha -- found the wait_queue item matching 'an_ioplea'.
                                              push_io_onto_run_queue (item, an_ioplea);                 # Schedule the I/O corresponding to 'an_ioplea'.
                                              filter (iopleas, items, result_wait_queue);               # Drop I/O opportunity from wait list.
                                          else
                                              filter (an_ioplea ! iopleas,  items,  item ! result_wait_queue);  # Not the right item -- move it to result_wait_queue and keep searching down wait_queue.
                                          fi;

                                      filter _ =>   raise exception  DIE "Compiler bug: Unsupported case in add_any_new_fd_io_opportunities_to_run_queue__iu/filter.";
                                  end;
                              end;
                    esac;

            esac;


        fun have_fds_on_io_watch ()
            =
            case *waiting_queue__local
                #
                [] => FALSE;
                 _ => TRUE;
            esac;

    };
end;



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext