PreviousUpNext

15.4.923  src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg

## mailop.pkg
#
# Implementation of mailop values and the mailop combinators.
#
# Our core types and datastructures are defined in:
#
#     src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
#
#
#
# Overview
# ========
#
# A 'mailop' represents a computation which might
# or might not be ready to proceed, such as a read
# from a maildrop which might or might not contain
# a value ready to be read.
#
# A mailop is in essence represented by a
#
#     is_mailop_ready_to_fire()
#
# fn which tests to see if the computation
# is ready to proceed and if it
#
#    IS     ready:  Returns a fire_mailop() fn which will perform the computation.
#    is NOT ready:  Returns a suspend_then_eventually_fire_mailop() fn which will arrange
#                   for us to be woken up when/if it is ready to fire.   
#
# This structure encapslates in function-wrapped
# form the functionality which is needed by the
#
#     do_one_mailop []
#
# fn in order to select and fire exactly one mailop from a list
# of mailops, if necessary blocking until one is ready to fire.
#
#
#
# Implementation
# ==============
#
# Some important requirements on the
# implementation of base mailop values:
#
#  1)  Mailop
#          is_mailop_ready_to_fire()
#          fire_mailop()
#          suspend_then_eventually_fire_mailop()
#      fns are always called from inside an uninterruptible scope.              # "uninterruptible scope" == "critical section" == "atomic region" == ...
#                                                                               # In practice it means that microthread_preemptive_scheduler::thread_scheduler_state is either IN_UNINTERRUPTIBLE_SCOPE or
#                                                                               # IN_UNINTERRUPTIBLE_SCOPE_WITH_PENDING_THREADSWITCH, either of which prevents thread switching.
#  2)  is_mailop_ready_to_fire() mailop fns return an integer priority.         
#      =======================
#      This is   0 when not enabled,
#               -1 for fixed priority and
#               >0 for dynamic priority.
#      The standard scheme is to associate a counter
#      with the underlying mailop value and to increase
#      it by one each time we try to fire the mailop.
#
#
#  3)  fire_mailop() mailop fns should NOT end the uninterruptible scope.
#      ===========
#
#
#  4)  suspend_then_eventually_fire_mailop() mailop fns are responsible
#      ========================
#      for ending the uninterruptible scope.
#
#      They are also responsible for executing the "finish_do1mailoprun"
#      action prior to exiting the uninterruptible scope.

# Compiled by:
#     src/lib/std/standard.lib



###                       "Another roof, another proof."
###
###                                     -- Paul Erdos 





stipulate
    package fat =  fate;                                                                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                                            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package rwq =  rw_queue;                                                                            # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package mps =  microthread_preemptive_scheduler;                                                    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
herein

    package mailop: (weak)
                            api {
                                include api Mailop;                                                     # Mailop                                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api
                                #
                                set_condvar__iu:   itt::Condition_Variable -> Void;
                                wait_on_condvar':  itt::Condition_Variable -> Mailop( Void );           # Exported for use in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
                            }
    {
        call_with_current_control_fate  =  fat::call_with_current_control_fate;
        switch_to_control_fate          =  fat::switch_to_control_fate;
        call_with_current_fate          =  fat::call_with_current_fate;
#       switch_to_fate                  =  fat::switch_to_fate;

        # Some inline functions
        # to improve performance:
        #
        fun map f
            =
            map'
            where
                fun map' [] => [];
                    map' [a] => [f a];
                    map' [a, b] => [f a, f b];
                    map' [a, b, c] => [f a, f b, f c];
                    map' (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (map' r);
                end;
            end;
        #
        fun apply f
            =
            apply'
            where
                fun apply' [] => ();
                    apply' (x ! r) => { f x; apply' r;};
                end;
            end;
        #
        fun fold_forward f init l
            =
            foldf (l, init)
            where
                fun foldf ([], accum) => accum;
                    foldf (x ! r, accum) => foldf (r, f (x, accum));
                end;
            end;
        #
        fun error msg
            =
            raise exception DIE msg;

        fun length l
            =
            loop (0, l)
            where
                fun loop (n, [])    =>   n;
                    loop (n, _ ! l) =>   loop (n + 1, l);
                end;
            end;



        Mailop_Readiness   ==  itt::Mailop_Readiness;
        Mailop             ==  itt::Mailop;
        Base_Mailop(X) =  itt::Base_Mailop(X);

        Run_Gun            =   Mailop(Void);
        End_Gun            =   Mailop(Void);

        # Condition variables.
        #
        # Because these variables are set inside
        # atomic regions we have to use different
        # conventions for clean-up, etc.  Instead
        # of requiring the suspend_then_eventually_fire_mailop fate
        # to call the finish_do1mailoprun fn and to end
        # the uninterruptible scope, we call the finish_do1mailoprun
        # function when setting the condition variable
        # (in set_condvar__iu), and have the invariant
        # that the suspend_then_eventually_fire_mailop fate is dispatched
        # outside the atomic region.


                                                                        # Nomenclature: What I'm calling "uninterruptible_scope" is usually called "critical section" or "atomic region"
                                                                        # in the literature.  I dislike "critical" because it is vague. ("critical" in what sense? Who knows?)
                                                                        # "atomic" is literally correct ("a-tomic" == "not cuttable" -- indivisible) but the modern reader is not
                                                                        # likely to take it in that sense at first blush.  And neither "section" nor "region" are as apropos as "scope".
        # Set a condition variable.
        # Caller guarantees that this function is always
        # executed in an uninterruptible scope.
        #
        set_condvar__iu
            =
            mps::set_condvar__iu;

#       fun set_condvar__iu (itt::CONDITION_VARIABLE state)                                                                     # Only external call is in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
#           =
#           case *state
#               #
#               itt::CONDVAR_IS_NOT_SET  waiting_threads                                                                        # waiting_threads is the list of threads sitting
#                   =>                                                                                                          # blocked waiting for this condvar to be set.
#                   {   mps::foreground_run_queue ->  rwq::RW_QUEUE { back, ... };
#                       #
#                       state :=    itt::CONDVAR_IS_SET  1;                                                                     # Set the condition variable.  NB: The '1' is a priority, not its (non-existent) value.
#                       #
#                       back :=     run  waiting_threads                                                                        # Add to foreground run queue all threads that were waiting for condvar to be set.
#                                   where
#                                       fun run [] =>   *back;
#                                           #
#                                           run ( { do1mailoprun_status=>REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest)
#                                               =>
#                                               run rest;                                                                       # Drop completed do1mailoprun.
#
#                                           run ( { do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), finish_do1mailoprun, fate } ! rest)
#                                               =>
#                                               {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
#                                                   #
#                                                   finish_do1mailoprun ();                                                     # Do stuff like   do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;   and sending nacks.
#
#                                                   (thread, fate)  !  (run rest);
#                                               };
#                                       end;
#                                   end;
#                   };
#
#                _ => error "condvar already set";
#           esac;


        # The mailop constructor for
        # waiting on a condition variable:
        #
        fun wait_on_condvar' (itt::CONDITION_VARIABLE  condvar_state)                                                           # This fn is used (only) below and in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
            =
            BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where 
                fun suspend_then_eventually_fire_mailop                                                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                                    # Does stuff like  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks.
                        return_to__suspend_then_eventually_fire_mailops__loop
                      }
                    =
                    call_with_current_fate
                        #
                        (\\ fate
                            =
                            {   waiting_threads                                                                                 # The list of threads waiting for the condvar to be set.
                                    =
                                    case *condvar_state
                                        #
                                        itt::CONDVAR_IS_NOT_SET waiting_threads =>   waiting_threads;
                                        itt::CONDVAR_IS_SET                     =>   raise exception DIE "Bug in wait_on_condvar'";
                                    esac;                                       #    Above exception should not happen:  is_mailop_ready_to_fire() only queues us up if *condvar_state is not CONDVAR_IS_SET.
                                #
                                waiting_thread =  { do1mailoprun_status,  finish_do1mailoprun,  fate };

                                condvar_state :=  itt::CONDVAR_IS_NOT_SET (waiting_thread ! waiting_threads);                   # Add ourself to list of threads waiting for condvar to be set.

                                return_to__suspend_then_eventually_fire_mailops__loop ();                                                       # Does not return.
                            }
                        );
                #
                fun is_mailop_ready_to_fire ()                                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    case *condvar_state
                        #
                        itt::CONDVAR_IS_SET
                            =>
                            {   READY_MAILOP {  fire_mailop  }
                                where
                                    fun fire_mailop ()                                                                          # Reppy refers to fire_mailop as 'doFn'.
                                        =
                                        log::uninterruptible_scope_mutex := 0;                                                  # End uninterruptible scope.
                                end;
                            };

                        itt::CONDVAR_IS_NOT_SET _
                            =>
                            UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                    esac;
            end;


        # A mailop which is always ready to fire
        # and produces given result:
        #
        fun always'  result                                                                                                     # This is used a lot in (for example)   src/lib/std/src/socket/socket.pkg
            =
            BASE_MAILOPS
              [
                \\ () = itt::READY_MAILOP
                          { fire_mailop => \\ () =  {   log::uninterruptible_scope_mutex := 0;                                  # Reppy refers to fire_mailop as 'doFn'.
                                                        result;
                                                    }
                          }
              ];

        # A mailop which is never ready to fire:
        #
        never' = BASE_MAILOPS [];                                                                                               # Used in:      src/lib/x-kit/widget/old/basic/hostwindow.pkg
                                                                                                                                #               src/lib/x-kit/xclient/src/window/widget-cable-old.pkg
                                                                                                                                
        # These generate mailops on-the-fly while 'do_one_mailop' is running.
        # The second is given a mailop with which to detect client
        # abort of the generated mailop:
        #
        dynamic_mailop           =  DYNAMIC_MAILOP;
        dynamic_mailop_with_nack =  DYNAMIC_MAILOP_WITH_NACK;                                                                   # This is mainly used in:   src/lib/std/src/io/winix-text-file-for-os-g.pkg
                                                                                                                                # Also used in:             src/lib/std/src/io/winix-mailslot-io-g.pkg
                                                                                                                                #                           src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg
                                                                                                                                #                               

        # Combine a list of mailops into a single mailop:
        #
        fun cat_mailops (mailops:  List(  Mailop(X) ))                                                                          # This gets called in (for example):   src/lib/std/src/io/winix-mailslot-io-g.pkg
            =                                                                                                                   # A frequent idiom is  block_until_mailop_fires (cat_mailops mailops);
            gather (reverse mailops, [])                                                                                        #
            where
                fun gather ([],                               results) =>  BASE_MAILOPS results;                                # Done, return results.
                    #
                    gather (BASE_MAILOPS []       ! rest,  results) =>  gather (rest,           results);
                    gather (BASE_MAILOPS [mailop] ! rest,  results) =>  gather (rest, mailop  ! results);
                    gather (BASE_MAILOPS  mailops ! rest,  results) =>  gather (rest, mailops @ results);
                    #
                    gather (mailops, []) =>  gather' (mailops, []);
                    gather (mailops, l ) =>  gather' (mailops, [BASE_MAILOPS l]);
                end 

                also
                fun gather' ([], [mailop]) =>  mailop;
                    gather' ([], mailops)  =>  CHOOSE_MAILOP mailops;
                    #
                    gather'  (CHOOSE_MAILOP mailops ! rest,  mailops')
                        =>
                        gather' (rest, mailops @ mailops');

                    gather' (BASE_MAILOPS base_mailops ! rest, BASE_MAILOPS base_mailops' ! rest')
                        =>
                        gather' (rest, BASE_MAILOPS (base_mailops @ base_mailops') ! rest');

                    gather' (mailop ! rest, mailops')
                        =>
                        gather' (rest, mailop ! mailops');
                end;
            end;

        fun if_then' (mailop, added_action)
            =
            # Here we implement the "==>" op used in do_one_mailop [...] rules.
            # This op takes two arguments
            #
            #     mailop:        Mailop(X)
            #     added_action:  X -> Y
            #
            # and from them constructs a new mailop of type
            #
            #                    Mailop(Y)
            #
            # which does exactly what the original mailop did,
            # except that afterwards it also does added_action.
            #
            # Recall that (suppressing a few details) a (base) Mailop                   # See   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
            # is essentially a function is_mailop_ready_to_fire:                                
            #
            #      Void -> (   READY_MAILOP { fire_mailop: Void -> X }
            #              | UNREADY_MAILOP ({...} -> X)
            #              )
            #
            # The fn that actually does the useful work here is
            #
            #   fire_mailop
            #
            # -- everything else is just bookkeeping etc -- and
            # our job here is basically to replace it by
            #
            #    added_action o fire_mailop
            #
            # Everything else in this fn is just the busywork of
            # iterating over the expression:
            #
            wrap' mailop
            where
                fun wrap_base_mailop  is_mailop_ready_to_fire  ()                       # Note that a mailop *is* an is_mailop_ready_to_fire fn. We hide that externally as an implementation detail, but it becomes visible at this level.
                    =                                                                   # Note also that 'wrap_base_mailop' is CURRIED -- our caller does not immediate supply our () arg, so we *initially* return a thunk that will
                    case (is_mailop_ready_to_fire ())                                   # *eventually* evaluate is_mailop_ready_to_fire() -- we do not do so initially.  The returned thunk is a new mailop wrapping the old mailop.
                        #
                          READY_MAILOP { fire_mailop }                          =>    READY_MAILOP { fire_mailop => added_action o fire_mailop };                       # The new fire_mailop value here is what this fn is all about.
                        UNREADY_MAILOP suspend_then_eventually_fire_mailop      =>  UNREADY_MAILOP (added_action o suspend_then_eventually_fire_mailop);                # Same as above in slightly different setting.
                    esac;
                #
                fun wrap' (BASE_MAILOPS base_mailops)            =>  BASE_MAILOPS (map  wrap_base_mailop  base_mailops);                                                # Iterate through the base mailops doing the above to them.
                    #
                    wrap' (CHOOSE_MAILOP mailops)                =>  CHOOSE_MAILOP           (map wrap' mailops);                                                       # Iterate through the compound mailops looking for work.
                    #
                    wrap' (DYNAMIC_MAILOP make_mailop)           =>  DYNAMIC_MAILOP           (\\ ()     =  if_then' (make_mailop(), added_action));                    # Same core substitution in setting of dynamic mailops.
                    wrap' (DYNAMIC_MAILOP_WITH_NACK f)           =>  DYNAMIC_MAILOP_WITH_NACK (\\ mailop =  if_then' (f mailop,      added_action));                    # Same core substitution in setting of dynamic mailops with nacks.
                end;
            end;

        (==>) =  if_then';                      # Infix synonym for readability.
        #
        fun make_exception_handling_mailop (mailop, exception_handler_fn)
            =
            wrap' mailop
            where
                fun wrap f x
                    =
                    f x
                    except
                        exn = exception_handler_fn exn;
                #
                fun wrap_base_mailop  is_mailop_ready_to_fire ()
                    =
                    case (is_mailop_ready_to_fire ())
                        #
                          READY_MAILOP { fire_mailop }                          =>    READY_MAILOP { fire_mailop => wrap fire_mailop };
                        UNREADY_MAILOP suspend_then_eventually_fire_mailop      =>  UNREADY_MAILOP (wrap suspend_then_eventually_fire_mailop);
                    esac;
                #
                fun wrap' (BASE_MAILOPS  base_mailops)  =>  BASE_MAILOPS  (map  wrap_base_mailop  base_mailops);
                    #
                    wrap' (CHOOSE_MAILOP mailops)       =>  CHOOSE_MAILOP (map  wrap'  mailops);
                    #
                    wrap' (DYNAMIC_MAILOP make_mailop)  =>  DYNAMIC_MAILOP           (\\ ()     =  make_exception_handling_mailop (make_mailop(), exception_handler_fn));
                    wrap' (DYNAMIC_MAILOP_WITH_NACK f)  =>  DYNAMIC_MAILOP_WITH_NACK (\\ mailop =  make_exception_handling_mailop (f mailop,      exception_handler_fn));
                end;
            end;

        Scanned_Mailops(X)
          = NACKFREE_MAILOPS  List( Base_Mailop(X) )
          | NACKFULL_MAILOPS  List( Scanned_Mailops(X) )
          | WITHNACK_MAILOP  (itt::Condition_Variable, Scanned_Mailops(X))
          ;

#       +DEBUG
#       fun sayGroup (msg, eg) = let
#             fun f (NACKFREE_MAILOPS l, sl) = "NACKFREE_MAILOPS(" ! int::to_string (list::length l) ::()) ! sl
#               | f (NACKFULL_MAILOPS l, sl) = "NACKFULL_MAILOPS(" ! g (l, ")" ! sl)
#               | f (WITHNACK_MAILOP  l, sl) = "WITHNACK_MAILOP(" ! f(#2 l, ")" ! sl)
#             also g ([], sl) = sl
#               | g ([x], sl) = f (x, sl)
#               | g (x ! r, sl) = f (x, ", " ! g (r, sl))
#             in
#               Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
#             end
#       -DEBUG

        # Scan mailop expression to run.
        # In particular, this evaluates all
        # dynamic rules to generate the actual
        # final rules to select between:
        #
        fun scan (BASE_MAILOPS l)
                =>
                NACKFREE_MAILOPS l;

            scan mailop
                =>
                scan' mailop
                where
                    fun scan' (DYNAMIC_MAILOP  make_mailop)
                            =>
                            scan' (make_mailop ());

                        scan' (DYNAMIC_MAILOP_WITH_NACK f)
                            =>
                            {   condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
                                #
                                WITHNACK_MAILOP  (condvar,  scan' (f (wait_on_condvar'  condvar)));
                            };

                        scan' (BASE_MAILOPS  mailops)
                            =>
                            NACKFREE_MAILOPS  mailops;

                        scan' (CHOOSE_MAILOP  mailops)
                            =>
                            scan_mailops (mailops, [])                                                                                                  # Optimistically assume nackfree; we'll fall back to  scan_nackfull_mailops()  if we're wrong.
                            where
                                fun scan_mailops ([], mailops)
                                        =>
                                        NACKFREE_MAILOPS mailops;

                                    scan_mailops (mailop ! rest, mailops')
                                        =>
                                        case (scan' mailop)
                                            #
                                            NACKFREE_MAILOPS mailops =>  scan_mailops          (rest,  mailops @                   mailops' );
                                            NACKFULL_MAILOPS mailops =>  scan_nackfull_mailops (rest,  mailops @ [NACKFREE_MAILOPS mailops']);
                                            /*        */     mailops =>  scan_nackfull_mailops (rest, [mailops,   NACKFREE_MAILOPS mailops']);
                                        esac;
                                end

                                also
                                fun scan_nackfull_mailops ([], [group]) =>  group;                                                                      # The general case vs scan_mailops above handling the nice simple (and common) case.
                                    scan_nackfull_mailops ([], l)       =>  NACKFULL_MAILOPS l;
                                    #
                                    scan_nackfull_mailops (mailop ! rest, l)
                                        =>
                                        case (scan' mailop, l)
                                            #                                 
                                            (NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
                                                =>
                                                scan_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');

                                            (NACKFULL_MAILOPS mailops, l) =>   scan_nackfull_mailops (rest, mailops @ l);
                                            (                 mailops, l) =>   scan_nackfull_mailops (rest, mailops ! l);                               # Here 'mailops' can be NACKFREE_MAILOPS or WITHNACK_MAILOP.
                                       esac;
        end;    end;  end;  end;end;


        stipulate
            #
            count =  REF 0;                                     # Runs circularly around range 0..999,999
            #
            fun pick_fairly i                                   # The point here is just to pick fairly among 'i' ready mailops,
                =                                               # so that every ready mailop gets a chance to fire reasonably soon.
                {   j =  *count;
                    #
                    if (j >= 1000000)   count := 0;             # No locking needed because we run on a single hostthread
                    else                count := j+1;           # and microthread pre-emption is done only at start of a fn.
                    fi;

                    int::rem (j, i);
                };
        herein
            #
            fun fairly_pick_mailop_to_fire [ fire_mailop ]
                    =>
                    fire_mailop;                                                                                                                        # Only one choice -- easy work!

                fairly_pick_mailop_to_fire  fire_mailop_fns
                    =>
                    list::nth (fire_mailop_fns, pick_fairly (length fire_mailop_fns));
             end;
        end;
        #
        fun make__do1mailoprun_status__and__finish_do1mailoprun ()
            =
            {   do1mailoprun_status =    REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));
                #
                finish_do1mailoprun    =    {.   do1mailoprun_status :=  itt::DO1MAILOPRUN_IS_COMPLETE;  };

                { do1mailoprun_status, finish_do1mailoprun };
            };


        stipulate
            # When we have exactly one mailop in the do_one_mailop[...] we can use simple logic:
            #
            fun do_lone_mailop  (is_mailop_ready_to_fire:  Base_Mailop(X))
                =
                {
                                                                                                                                                        mps::assert_not_in_uninterruptible_scope "do_lone_mailop";
                    log::uninterruptible_scope_mutex := 1;
                    #
                    case (is_mailop_ready_to_fire ())
                        #             
                        READY_MAILOP { fire_mailop }                                                                                                    # Reppy refers to 'fire_mailop' as 'doFn'.
                            =>
                            fire_mailop ();

                        UNREADY_MAILOP  suspend_then_eventually_fire_mailop
                            =>
                            {   (make__do1mailoprun_status__and__finish_do1mailoprun ())
                                    ->
                                    { do1mailoprun_status, finish_do1mailoprun };
                                #
                                suspend_then_eventually_fire_mailop
                                  { do1mailoprun_status,
                                    finish_do1mailoprun,
                                    return_to__suspend_then_eventually_fire_mailops__loop
                                        =>
                                        mps::dispatch_next_thread__xu__noreturn                                                                         # Since we have only one mailop, we do not need to actually loop here.
                                  };
                            };
                    esac;
                };

            Test_Mailops_For_Readiness_To_Fire__Result(X)
                #
                = NO_READY_MAILOPS { start_mailop_watch__fns:   List (itt::Suspend_Then_Eventually_Fire_Mailop__Fn(X)) }
                |    READY_MAILOPS { fire_mailop_fns:           List (Void -> X) }
                ;


        herein

            # This function handles the case of picking
            # and firing one of a list of mailops thunks
            # (without any negative acknowledgements).
            #
            # It also handles NEVER.
            #
            fun do_nackfree_mailops []       =>  mps::dispatch_next_thread__noreturn ();                                                                # 'do_one_mailop' with empty rule list:  No mailop can ever fire,
                #                                                                                                                                       # so there is no way to ever make progress -- end of thread!
                #                                                                                                                                       # XXX BUGGO FIXME Should we call some thread_exit type fn here?
                #                                                                                                                                       #                 Or throw an exception or log something?
                #                                                                                                                                       #                 Or at LEAST set MICROTHREAD.state to non-itt::state::ALIVE?
                #                                                                                                                                       #                 Just hanging is going to be bloody mysterious to debug.
                #
                do_nackfree_mailops [mailop] =>  do_lone_mailop  mailop;                                                                                # This is the only call to   do_lone_mailop().

                do_nackfree_mailops  mailops
                    =>
                    {
                                                                                                                                                        mps::assert_not_in_uninterruptible_scope "do_nackfree_mailops";
                        log::uninterruptible_scope_mutex := 1;                                                                                          # Start uninterruptible scope. (Aka "critical section", "atomic region" etc.)
                        #
                        case (test_mailops_for_readiness_to_fire (mailops, []))
                            #
                            READY_MAILOPS { fire_mailop_fns }
                                =>
                                {   mailop_to_fire =   fairly_pick_mailop_to_fire  fire_mailop_fns;                                                     # Pick a do_one_mailop[....] mailop to fire ...
                                    mailop_to_fire ();                                                                                                  # ... and then fire it.
                                };

                            NO_READY_MAILOPS { start_mailop_watch__fns }
                                =>
                                call_with_current_control_fate
                                    #
                                    (\\ fate =  {   run__suspend_then_eventually_fire_mailops__loop__noreturn  start_mailop_watch__fns;                 error "[run__suspend_then_eventually_fire_mailops__loop__noreturn returned?!]"; }
                                        where                                                                                                           # Execution should never reach above.
                                            switch_to_control_fate =  switch_to_control_fate  fate;
                                            #
                                            (make__do1mailoprun_status__and__finish_do1mailoprun ())
                                                ->
                                                { do1mailoprun_status, finish_do1mailoprun };
                                            #
                                            fun run__suspend_then_eventually_fire_mailops__loop__noreturn []                                            # We have now set up ready watches on all mailops so
                                                    =>                                                                                                  # switch let next ready-to-run microthread run while
                                                    mps::dispatch_next_thread__xu__noreturn ();                                                         # we wait for one of these mailops to become ready to fire.
                                                #
                                                run__suspend_then_eventually_fire_mailops__loop__noreturn  (suspend_then_eventually_fire_mailop ! remaining_mailops)
                                                    =>
                                                    switch_to_control_fate
                                                        #
                                                        (suspend_then_eventually_fire_mailop                                                            # Set up ready-watch for this particular mailop.
                                                          { do1mailoprun_status,
                                                            finish_do1mailoprun,
                                                            return_to__suspend_then_eventually_fire_mailops__loop                                       # maildrop.pkg, mailslot.pkg etc call this to return control to us.
                                                                =>
                                                                {.  run__suspend_then_eventually_fire_mailops__loop__noreturn  remaining_mailops;  }    # Set up ready watches on remaining mailops. 
                                                          }
                                                        );
                                            end;
                                        end
                                    );
                            
                        esac;
                    }
                    where
                        #
                        fun test_mailops_for_readiness_to_fire (is_mailop_ready_to_fire ! rest,  start_mailop_watch__fns)                               # In this loop we have not yet found any mailops ready to fire.
                                =>
                                case (is_mailop_ready_to_fire ())
                                    #
                                    UNREADY_MAILOP  suspend_then_eventually_fire_mailop
                                        =>
                                        test_mailops_for_readiness_to_fire   (rest, suspend_then_eventually_fire_mailop ! start_mailop_watch__fns);

                                    READY_MAILOP      { fire_mailop }
                                        =>
                                        test_mailops_for_readiness_to_fire'  (rest, [ fire_mailop ]);
                                esac;

                            test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns)                                                            # Done -- no ready-to-fire mailops found and no candidates left to check.
                                =>
                                NO_READY_MAILOPS { start_mailop_watch__fns };                                                                           # No mailops were ready to fire; return list of fns which
                        end                                                                                                                             # each start a watch on one mailop for readiness to fire.

                        also
                        fun test_mailops_for_readiness_to_fire' (is_mailop_ready_to_fire ! rest,  fire_mailop_fns)                                      # In this loop we have found at least one mailop which is ready to fire.
                                =>
                                case (is_mailop_ready_to_fire ())
                                    #
                                    READY_MAILOP { fire_mailop }
                                        =>  test_mailops_for_readiness_to_fire' (rest,   fire_mailop ! fire_mailop_fns);
                                    _   =>  test_mailops_for_readiness_to_fire' (rest,                 fire_mailop_fns);
                                esac;

                            test_mailops_for_readiness_to_fire' ([],  fire_mailop_fns)
                                =>
                                 READY_MAILOPS { fire_mailop_fns };                                                                                     # At least one mailop was ready to fire; return the fns which
                                                                                                                                                        # will fire the ready-to-fire mailops.

                        end;                                                                                                                            # fun do_ready_mailops

                                                                                                                                                        # NOTE: Maybe we should just keep
                                                                                                                                                        #       track of the max priority above?
                                                                                                                                                        #       What about fairness to fixed
                                                                                                                                                        #       priority mailops (e::g., always, timeout?)

                    end;                                                                                                                                # where
            end;                                                                                                                                        # fun do_nackfree_mailops
        end;                                                                                                                                            # stipulate

        stipulate
            # Walk the mailop group tree,
            # collecting the base mailops
            # (with associated ack flags),
            # also a list of nackstates.
            #
            # A nackstate is a
            #     (Condvar, List(Flag(Ack)))
            # pair, where the flags are
            # those associated with the
            # mailops covered by the nack
            # condvar.
            #
            fun gather_base_mailops_and_nackstates  mailops
                =
                case mailops
                    #
                    NACKFULL_MAILOPS _
                        =>
                        gather_mailops (mailops, [], [])
                        where 
                            un_wrapped_flag =  REF FALSE;
                            #
                            fun reverse_and_prepend_mailops  (mailop ! rest,  results) =>  reverse_and_prepend_mailops  (rest,  (mailop, un_wrapped_flag) ! results);
                                reverse_and_prepend_mailops  (           [],  results) =>  results;
                            end;
                            #
                            fun gather_mailops (NACKFREE_MAILOPS nackfree_mailops, bl, nackstates)
                                    =>
                                    (reverse_and_prepend_mailops (nackfree_mailops, bl), nackstates);

                                gather_mailops (NACKFULL_MAILOPS nackfull_mailops, bl, nackstates)
                                    =>
                                    fold_forward  f  (bl, nackstates)  nackfull_mailops
                                    where
                                        fun f (group', (bl', nackstates'))
                                            =
                                            gather_mailops (group', bl', nackstates');
                                    end;

                                gather_mailops (withnack as WITHNACK_MAILOP _, bl, nackstates)
                                    =>
                                    gather_wrapped (withnack, bl, nackstates);
                            end;
                        end;

                    group =>   gather_wrapped (group, [], []);
                esac
                where
                    fun gather_wrapped (group, bl, nackstates)
                            =
                            {    (gather (group, bl, [], nackstates))
                                    ->
                                    (bl, _, nackstates);

                                (bl, nackstates);
                            }
                            where
                                fun gather (NACKFREE_MAILOPS mailops, bl, all_flags, nackstates)
                                        =>
                                        {   (reverse_and_prepend_mailops (mailops, bl, all_flags))
                                                ->
                                                (bl', all_flags');

                                            (bl', all_flags', nackstates);
                                        }
                                        where
                                            fun reverse_and_prepend_mailops ([],  bl,  all_flags)
                                                    =>
                                                    (bl, all_flags);

                                                reverse_and_prepend_mailops (mailop ! rest,  bl,  all_flags)
                                                    =>
                                                    {   flag = REF FALSE;
                                                        #
                                                        reverse_and_prepend_mailops  (rest,  (mailop, flag) ! bl,  flag ! all_flags);
                                                    };
                                            end;
                                        end;


                                    gather (NACKFULL_MAILOPS group, bl, all_flags, nackstates)
                                        =>
                                        fold_forward  f  (bl, all_flags, nackstates)  group
                                        where
                                            fun f (group', (bl', all_flags', nackstates'))
                                                =
                                                gather (group', bl', all_flags', nackstates');
                                        end;

                                    gather (WITHNACK_MAILOP (condvar, group), bl, all_flags, nackstates)
                                        =>
                                        {   (gather (group, bl, [], nackstates))
                                                ->
                                                (bl', all_flags', nackstates');

                                            ( bl',
                                              all_flags' @ all_flags,
                                              (condvar, all_flags')  !  nackstates'
                                            );
                                        };
                                end;                                                                    # fun gather
                            end;                                                                        # where
                end;                                                                                    # where

            Test_Mailops_For_Readiness_To_Fire__Result(X)
                #
                = NO_READY_MAILOPS { start_mailop_watch__fns:   List ((itt::Suspend_Then_Eventually_Fire_Mailop__Fn(X), Ref(Bool))) }
                |    READY_MAILOPS { fire_mailop_fns:           List ( ((Void -> X), Ref(Bool)) ) }
                ;

        herein
            # This function handles the more
            # complicated case of running a
            # mailop expression where negative
            # acknowledgements are involved.
            #
            fun do_nackfull_mailops  group
                =
                {
                                                                                                                                                mps::assert_not_in_uninterruptible_scope "do_nackfull_mailops";
                    log::uninterruptible_scope_mutex := 1;                                                                                      # Start uninterruptible scope (aka "critical section" aka "atomic region"...)
                    #
                    case (test_mailops_for_readiness_to_fire (mailops, []))
                        #
                        READY_MAILOPS { fire_mailop_fns }
                            =>
                            {   (fairly_pick_mailop_to_fire  fire_mailop_fns)
                                    ->
                                    (fire_mailop, flag);

                                flag := TRUE;

                                send_nacks_as_appropriate ();

                                fire_mailop ();
                            };

                        NO_READY_MAILOPS { start_mailop_watch__fns }
                            =>
                            call_with_current_control_fate
                                (\\ fate
                                    =
                                    {   run__suspend_then_eventually_fire_mailops__loop__noreturn  start_mailop_watch__fns;                     # Ask each mailop in the do_one_mailop [...] list to enqueue itself as appropriate. 
                                        #
                                        error "[run__suspend_then_eventually_fire_mailops__loop__noreturn returned?!]";                         # This cannot happen -- run__suspend_then_eventually_fire_mailops__loop__noreturn() runs next thread when done.
                                    }
                                    where
                                        switch_to_control_fate =  switch_to_control_fate  fate;
                                        #
                                        do1mailoprun_status =   REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread ()));           # 'do1mailoprun_status' is basically a mutex ensuring exactly one mailop fires per do1mailoprun.
                                        #
                                        fun finish_do1mailoprun_fn  flag  ()                                                                    # This will be called by the first mailop to fire. 
                                            =
                                            {   do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
                                                flag := TRUE;
                                                send_nacks_as_appropriate ();
                                            };
                                        #
                                        fun run__suspend_then_eventually_fire_mailops__loop__noreturn  []
                                                =>
                                                mps::dispatch_next_thread__xu__noreturn ();
                                            #
                                            run__suspend_then_eventually_fire_mailops__loop__noreturn  ((suspend_then_eventually_fire_mailop, flag)  !  rest)
                                                =>
                                                switch_to_control_fate
                                                    (
                                                        suspend_then_eventually_fire_mailop
                                                          {
                                                            do1mailoprun_status,
                                                            finish_do1mailoprun =>  finish_do1mailoprun_fn flag,
                                                            return_to__suspend_then_eventually_fire_mailops__loop                               # maildrop.pkg, mailslot.pkg etc call this to return control to us.
                                                                =>
                                                                \\ () =  run__suspend_then_eventually_fire_mailops__loop__noreturn  rest
                                                          }
                                                    );
                                        end;                                                                                                    # fun run__suspend_then_eventually_fire_mailops__loop__noreturn
                                    end                                                                                                         # where
                                );

                    esac;       
                }
                where
                    (gather_base_mailops_and_nackstates  group)
                        ->
                        (mailops, nackstates);
                    #
                    fun send_nacks_as_appropriate ()
                        =
                        apply  check_condvar  nackstates                                                                                        # NB: We capture 'nackstates' here for later use in 'finish_do1mailoprun_fn'.
                        where
                            # check_condvar checks the flags of a nackstate.
                            # If they are all FALSE then the
                            # corresponding condvar is set to signal
                            # the negative ack.
                            #
                            fun check_condvar (condvar, flags)
                                =
                                check_flags flags
                                where
                                    fun check_flags ((REF TRUE) ! _) =>  ();
                                        check_flags (_ ! rest)       =>  check_flags  rest;
                                        check_flags []               =>  set_condvar__iu  condvar;
                                    end;
                                end;
                        end;
                    #
                    fun test_mailops_for_readiness_to_fire ((is_mailop_ready_to_fire, flag) ! rest,   start_mailop_watch__fns)                  # In this loop we have not yet found a mailop ready to fire.
                            =>
                            case (is_mailop_ready_to_fire ())
                                #
                                  READY_MAILOP    { fire_mailop }                       =>  test_mailops_for_readiness_to_fire' (rest, [ (fire_mailop, flag)]);
                                UNREADY_MAILOP  suspend_then_eventually_fire_mailop     =>  test_mailops_for_readiness_to_fire  (rest, (suspend_then_eventually_fire_mailop, flag) ! start_mailop_watch__fns);
                            esac;

                        test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns)                                                        # Done -- no ready-to-fire mailops found and none left to check,
                            =>                                                                                                                  # so we must block until some mailop becomes ready to fire.
                            NO_READY_MAILOPS { start_mailop_watch__fns };
                    end

                    also
                    fun test_mailops_for_readiness_to_fire' ((is_mailop_ready_to_fire, flag) ! rest,  fire_mailop_fns)                          # In this loop we have found at least one do_one_mailop[...] mailop ready to fire.
                            =>
                            case (is_mailop_ready_to_fire ())
                                #
                                READY_MAILOP { fire_mailop }
                                    =>
                                    test_mailops_for_readiness_to_fire'  (rest,  (fire_mailop, flag) ! fire_mailop_fns);

                                _   =>   test_mailops_for_readiness_to_fire'  (rest,  fire_mailop_fns);
                            esac;

                        test_mailops_for_readiness_to_fire'  ([],  fire_mailop_fns)                                                             # Done -- pick one do_one_mailop[...] mailop to fire and then fire it.
                            =>
                            READY_MAILOPS { fire_mailop_fns };
                    end;
                                                                                                                                                # NOTE: Maybe above we should just
                                                                                                                                                # keep track of the max priority?
                                                                                                                                                # What about fairness to fixed
                                                                                                                                                # priority mailops (e::g., always, timeout?)
                                                                                                                                                #

                end;                                                                                                                            # fun do_nackfull_mailops
        end;                                                                                                                                    # stipulate


        #
        fun block_until_mailop_fires  mailop                                                                                                    # PUBLIC.
            =
            case (scan mailop)
                #
                NACKFREE_MAILOPS mailops =>  do_nackfree_mailops  mailops;
                /*            */ mailops =>  do_nackfull_mailops  mailops;
            esac;


                                                                                                                                                # 'do_one_mailop' is our core entrypoint, the 'do_one_mailop' used by clients
                                                                                                                                                # to do handle-multiple-mail-sources style thread I/O via
                                                                                                                                                # statements looking like
                                                                                                                                                #
                                                                                                                                                #     do_one_mailop [
                                                                                                                                                #         foo' ==> {. do_this (); },
                                                                                                                                                #         bar' ==> {. do_that (); },
                                                                                                                                                #         ...
                                                                                                                                                #     ];
                                                                                                                                                #
                                                                                                                                                # We have two main cases:
                                                                                                                                                #  1) If one or more mailops in the list is ready to fire, we pick one and fire it.
                                                                                                                                                #  2) If no mailop in the list is ready to fire, we must block until one becomes ready, then continue as in 1).

        fun do_one_mailop  mailops                                                                                                              # PUBLIC.
            =
            case (scan_mailops (mailops, []))
                #
                NACKFREE_MAILOPS mailops  =>   do_nackfree_mailops  mailops;                                                                    # This is special-case handling for simple special case of no WITH_NACK mailops.
                /*            */ mailops  =>   do_nackfull_mailops  mailops;                                                                    # This is the general case.
            esac
            where
                # Preparation.  During this phase we need to:
                #
                #  o Expand DYNAMIC_MAILOP                                                                                                      # These are essentially hacks to allow generating
                #    and    DYNAMIC_MAILOP_WITH_NACK clauses.                                                                                   # mailops on the fly while 'do_one_mailop' is running.
                #
                #  o Figure out whether we have any WITH_NACK clauses,
                #    which complicate things.
                #
                #  In the common case of no WITH_NACK mailops
                #  we return NACKFREE_MAILOPS _, otherwise    
                #  we return NACKFULL_MAILOPS _ or WITHNACK_MAILOP _.
                # 
                fun scan_mailops  (mailop ! rest,  mailops)                                                                                     # scan_mailops          handles the nice simple nack-free case;
                        =>                                                                                                                      # scan_nackfull_mailops handles the case where one or more WITH_NACK clauses are present.
                        case (scan_one_mailop  mailop)
                            #
                            /* */ NACKFREE_MAILOPS mailops'  =>  scan_mailops           (rest,  mailops' @                    mailops  );
                            /* */ NACKFULL_MAILOPS mailops'  =>  scan_nackfull_mailops  (rest,  mailops' @ [ NACKFREE_MAILOPS mailops ]);
                            wm as WITHNACK_MAILOP   _        =>  scan_nackfull_mailops  (rest, [ wm,         NACKFREE_MAILOPS mailops ]);
                        esac;

                    scan_mailops ([], mailops)       => NACKFREE_MAILOPS  mailops;                              # Done!
                end

                also
                fun scan_nackfull_mailops ([], [result]) =>  result;
                    scan_nackfull_mailops ([], results)  =>  NACKFULL_MAILOPS results;
                    #
                    scan_nackfull_mailops (mailop ! rest,  results)
                        =>
                        case (scan_one_mailop  mailop,  results)
                            #
                            (NACKFREE_MAILOPS mailops,  NACKFREE_MAILOPS mailops' ! rest')
                                =>
                                scan_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');

                            (NACKFULL_MAILOPS mailops, results) =>   scan_nackfull_mailops  (rest,  mailops @ results);
                            (other,                    results) =>   scan_nackfull_mailops  (rest,    other ! results);                         # 'other' can be (NACKFREE_MAILOPS _) or (WITHNACK_MAILOP _).
                        esac;
                end

                also
                fun scan_one_mailop (BASE_MAILOPS       mailops)     =>   NACKFREE_MAILOPS mailops;
                    scan_one_mailop (CHOOSE_MAILOP      mailops)     =>   scan_mailops (mailops, []);
                    scan_one_mailop (DYNAMIC_MAILOP     make_mailop) =>   scan_one_mailop (make_mailop ());                                     # Generate a do_one_mailop[...] mailop on the fly, then recursively scan it.
                    #
                    scan_one_mailop (DYNAMIC_MAILOP_WITH_NACK make_mailop)                                                                      # Like as DYNAMIC_MAILOP but with a nack mailop to signal client-code abort.
                        =>
                        {   condvar =  itt::CONDITION_VARIABLE  (REF  (itt::CONDVAR_IS_NOT_SET []));
                            #
                            WITHNACK_MAILOP  (condvar,  scan_one_mailop  (make_mailop  (wait_on_condvar'  condvar)));
                        };

                end;
            end;                        # fun do_one_mailop





        Replyqueue_Entry                                                                        # See Note[1] below.
          =
          { id:         Int,                                                                    # The 'id' field is so that we can unambiguously remove a request from a request queue -- Mailops are not an equality type.
            op:         Mailop(Void)                                                            # This will be the oneshot for the reply from another imp, together with our logic for handling that reply.
          };
        Replyqueue
          =
          { next_id:    Ref(Int),                                                               # Used to generate Request.id values.
            queue:      Ref( List( Replyqueue_Entry ) )                                         # The request queue proper, a list of reply oneshots wrapped in handling logic.
          };

        fun make_replyqueue ()
            =
            { next_id => REF 1,
              queue   => REF []
            };


        fun put_in_replyqueue                                                                   # Remember that we're expecting a reply to a request made of an external imp.
              (
                q:      Replyqueue,                                                             # This is our imp-private reply queue.
                op:     Mailop(Void)                                                            # This is the mailop which will fire when our reply arrives.
              ) 
            =
            {   id =  *q.next_id;       q.next_id := id + 1;                                    # Allocate a fresh id for the nascent replyqueue entry.
                #
                op =  (op ==> (\\ x =  {  drop_from_replyqueue { q, id };  x;  }));             # Wrap given mailop so that when it fires it will delete its replyqueue entry.

                q.queue :=   { id, op }  !  *q.queue;                                           # Add entry to replyqueue.
            }
            where
                fun drop_from_replyqueue  { q: Replyqueue,  id: Int }                           # Drop from replyqueue the entry with id==my_id.
                    =
                    q.queue :=  drop_it (*q.queue, [])
                    where
                        fun drop_it ((queue_entry as { id => id', op }) ! rest, result)         # Copy given list, dropping id' from it.
                                =>
                                if (id' == id)   drop_it (rest,                 result);        # This is the queue entry to drop -- ignore it, but copy rest of list.
                                else             drop_it (rest,   queue_entry ! result);        # Not our queue entry -- copy it to result list.
                                fi;

                            drop_it ([], result) =>   result;                                   # Yes, result is reversed relative to input list. We don't care.
                        end;
                    end;
            end;


        fun do_one_mailop'  (q: Replyqueue)  mailops                                            # Just like do_one_mailop, except it also processes the replyqueue of pending replies from other imps.
            =
            do_one_mailop  (prepend_replyqueue_mailops (*q.queue, mailops))                     # Augment the mailops list and then pass the buck to the vanilla do_one_mailop().
            where
                fun prepend_replyqueue_mailops ([], result)                                     # Prepend replyqueue contents to 'mailops' and return result.
                        =>
                        result;

                    prepend_replyqueue_mailops  ({ id, op }  !  rest,   result)                 # 
                        =>
                        prepend_replyqueue_mailops  (rest,   op ! result);                      # 
                end;
            end;

        fun replyqueue_to_string ({ next_id, queue => REF q }, name)
            =
            sprintf "{|RQ|:%s =%d}" name (length q);

    };                                                                                          # package mailop
end;

##############################################################################################
# Note[1]
#
# To avoid deadlock, it is critically important that each imp
# write to message queues (a write to a message queue cannot block)
# and read via a single call (ensuring it cannot be blocked on one
# read call while another has input ready to process).
#
# Obtaining a value from another imp is done by putting it in a
# request queue together with a oneshot maildrop for the reply.
#     Our imp then needs to keep a list of such oneshot-maildrops
# and scan it as part of our mailop input statement (do_one_mailop []).
#     Any oneshot which fires during such a read statement needs to
# be dropped from the request queue, since it will never fire again
# and thus is just clutter and a potential memory leak.
#
# Note that since a refcell is equal to itself and nothing else,
# we could use Ref(Void) refcells for the id type, thus avoiding
# the need for next_id and the risk of Int wrap-around.  I'm using
# Int anyhow because it simplifies debugging by allowing logging
# of more readable traces.


## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext