PreviousUpNext

15.4.903  src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg

## mailop.pkg
#
# Implementation of mailop values and the mailop combinators.
#
# Some important requirements on the implementation
# of base mailop values:
#
#  1)  An  is_mailop_ready_to_fire(), fire_mailop()
#      or set_up_mailopready_watch() fn
#      is always called from inside an uninterruptible scope.                   # "uninterruptible scope" == "critical section" == "atomic region" == ...
#                                                                               # In practice it means that microthread_preemptive_scheduler::thread_scheduler_state is either IN_UNINTERRUPTIBLE_SCOPE or
#                                                                               # IN_UNINTERRUPTIBLE_SCOPE_WITH_PENDING_THREADSWITCH, either of which prevents thread switching.
#  2)  An is_mailop_ready_to_fire() returns an integer priority.                
#      This is   0 when not enabled,
#               -1 for fixed priority and
#               >0 for dynamic priority.
#      The standard scheme is to associate a counter
#      with the underlying synchronization value and
#      to increase it by one for each synchronization attempt.
#
#  3)  A set_up_mailopready_watch fn is responsible
#      for ending the uninterruptible scope.
#
#      A fire_mailop should NOT end the uninterruptible scope.
#
#  4)  Each set_up_mailopready_watch fn is responsible for executing
#      the "finish_do1mailoprun" action prior to ending the uninterruptible scope.

# Compiled by:
#     src/lib/std/standard.lib



###                       "Another roof, another proof."
###
###                                     -- Paul Erdos 





stipulate
    package fat =  fate;                                                        # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                    # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package rwq =  rw_queue;                                                    # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package mps =  microthread_preemptive_scheduler;                            # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
herein

    package mailop: (weak)
                            api {
                                include Mailop;                                 # Mailop                                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api
                                #
                                set_condvar__iu:   itt::Condition_Variable -> Void;
                                wait_on_condvar':                       itt::Condition_Variable -> Mailop( Void );      # Exported for use in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
                            }
    {
        call_with_current_control_fate  =  fat::call_with_current_control_fate;
        switch_to_control_fate          =  fat::switch_to_control_fate;
        call_with_current_fate          =  fat::call_with_current_fate;
#       switch_to_fate                  =  fat::switch_to_fate;

        # Some inline functions
        # to improve performance:
        #
        fun map f
            =
            map'
            where
                fun map' [] => [];
                    map' [a] => [f a];
                    map' [a, b] => [f a, f b];
                    map' [a, b, c] => [f a, f b, f c];
                    map' (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (map' r);
                end;
            end;
        #
        fun apply f
            =
            apply'
            where
                fun apply' [] => ();
                    apply' (x ! r) => { f x; apply' r;};
                end;
            end;
        #
        fun fold_forward f init l
            =
            foldf (l, init)
            where
                fun foldf ([], accum) => accum;
                    foldf (x ! r, accum) => foldf (r, f (x, accum));
                end;
            end;
        #
        fun error msg
            =
            raise exception FAIL msg;

        Mailop_Readiness   ==  itt::Mailop_Readiness;
        Mailop             ==  itt::Mailop;
        Base_Mailop(X) =  itt::Base_Mailop(X);


        # Condition variables.
        #
        # Because these variables are set inside
        # atomic regions we have to use different
        # conventions for clean-up, etc.  Instead
        # of requiring the set_up_mailopready_watch fate
        # to call the finish_do1mailoprun fn and to end
        # the uninterruptible scope, we call the finish_do1mailoprun
        # function when setting the condition variable
        # (in set_condvar__iu), and have the invariant
        # that the set_up_mailopready_watch fate is dispatched
        # outside the atomic region.


                                                                        # Nomenclature: What I'm calling "uninterruptible_scope" is usually called "critical section" or "atomic region"
                                                                        # in the literature.  I dislike "critical" because it is vague. ("critical" in what sense? Who knows?)
                                                                        # "atomic" is literally correct ("a-tomic" == "not cuttable" -- indivisible) but the modern reader is not
                                                                        # likely to take it in that sense at first blush.  And neither "section" nor "region" are as apropos as "scope".
        # Set a condition variable.
        # Caller guarantees that this function is always
        # executed in an uninterruptible scope.
        #
        set_condvar__iu
            =
            mps::set_condvar__iu;

#       fun set_condvar__iu (itt::CONDITION_VARIABLE state)                                             # Only external call is in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
#           =
#           case *state
#               #
#               itt::CONDVAR_IS_NOT_SET  waiting_threads                                                                        # waiting_threads is the list of threads sitting
#                   =>                                                                                                          # blocked waiting for this condvar to be set.
#                   {   mps::foreground_run_queue ->  rwq::RW_QUEUE { back, ... };
#                       #
#                       state :=    itt::CONDVAR_IS_SET  1;                                                                     # Set the condition variable.  NB: The '1' is a priority, not its (non-existent) value.
#                       #
#                       back :=     run  waiting_threads                                                                        # Add to foreground run queue all threads that were waiting for condvar to be set.
#                                   where
#                                       fun run [] =>   *back;
#                                           #
#                                           run ( { do1mailoprun_status=>REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest)
#                                               =>
#                                               run rest;                                                                       # Drop completed do1mailoprun.
#
#                                           run ( { do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), finish_do1mailoprun, fate } ! rest)
#                                               =>
#                                               {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
#                                                   #
#                                                   finish_do1mailoprun ();                                                     # Do stuff like   do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;   and sending nacks.
#
#                                                   (thread, fate)  !  (run rest);
#                                               };
#                                       end;
#                                   end;
#                   };
#
#                _ => error "condvar already set";
#           esac;


        # The mailop constructor for
        # waiting on a condition variable:
        #
        fun wait_on_condvar' (itt::CONDITION_VARIABLE  condvar_state)                                                           # This fn is used (only) below and in   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
            =
            BASE_MAILOPS [is_mailop_ready_to_fire]
            where 
                fun set_up_mailopready_watch                                                                                    # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                                    # Does stuff like  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks.
                        return_to__set_up_mailopready_watches__loop
                      }
                    =
                    call_with_current_fate
                        #
                        (fn fate
                            =
                            {   waiting_threads                                                                                 # The list of threads waiting for the condvar to be set.
                                    =
                                    case *condvar_state
                                        #
                                        itt::CONDVAR_IS_NOT_SET waiting_threads =>   waiting_threads;
                                        itt::CONDVAR_IS_SET _                   =>   raise exception FAIL "Bug in wait_on_condvar'";
                                    esac;                                       #    Above exception should not happen:  is_mailop_ready_to_fire() only queues us up if *condvar_state is not CONDVAR_IS_SET.
                                #
                                waiting_thread =  { do1mailoprun_status,  finish_do1mailoprun,  fate };

                                condvar_state :=  itt::CONDVAR_IS_NOT_SET (waiting_thread ! waiting_threads);                   # Add ourself to list of threads waiting for condvar to be set.

                                return_to__set_up_mailopready_watches__loop ();                                                 # Does not return.
                            }
                        );
                #
                fun is_mailop_ready_to_fire ()                                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    case *condvar_state
                        #
                        itt::CONDVAR_IS_SET priority
                            =>
                            {   condvar_state :=   itt::CONDVAR_IS_SET (priority+1);                                            # Increment the condition-variable priority.
                                #
                                MAILOP_IS_READY_TO_FIRE {  priority,  fire_mailop  }
                                where
                                    fun fire_mailop ()                                                                          # Reppy refers to fire_mailop as 'doFn'.
                                        =
                                        {   condvar_state :=  itt::CONDVAR_IS_SET  1;                                           # Set condvar priority back to minimum.
                                            #
                                            log::uninterruptible_scope_mutex := 0;                                              # End uninterruptible scope.
                                        };
                                end;
                            };

                        itt::CONDVAR_IS_NOT_SET _
                            =>
                            MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                    esac;
            end;


        # A mailop which is always ready to fire
        # and produces given result:
        #
        fun always'  result                                                                                                     # This is used a lot in (for example)   src/lib/std/src/socket/socket.pkg
            =
            BASE_MAILOPS
              [
                fn () = itt::MAILOP_IS_READY_TO_FIRE
                          { priority    => -1,
                            fire_mailop => fn () =  {   log::uninterruptible_scope_mutex := 0;                                  # Reppy refers to fire_mailop as 'doFn'.
                                                        result;
                                                    }
                          }
              ];

        # A mailop which is never ready to fire:
        #
        never' = BASE_MAILOPS [];                                                                                               # Used in:      src/lib/x-kit/widget/basic/topwindow.pkg
                                                                                                                                #               src/lib/x-kit/xclient/src/window/widget-cable.pkg
                                                                                                                                
        # These generate mailops on-the-fly while 'do_one_mailop' is running.
        # The second is given a mailop with which to detect client
        # abort of the generated mailop:
        #
        dynamic_mailop           =  DYNAMIC_MAILOP;
        dynamic_mailop_with_nack =  DYNAMIC_MAILOP_WITH_NACK;                                                                   # This is mainly used in:   src/lib/std/src/io/winix-text-file-for-os-g.pkg
                                                                                                                                # Also used in:             src/lib/std/src/io/winix-mailslot-io-g.pkg
                                                                                                                                #                           src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg
                                                                                                                                #                               

        # Combine a list of mailops into a single mailop:
        #
        fun cat_mailops (mailops:  List(  Mailop(X) ))                                                                          # This gets called in (for example):   src/lib/std/src/io/winix-mailslot-io-g.pkg
            =                                                                                                                   # A frequent idiom is  block_until_mailop_fires (cat_mailops mailops);
            gather (reverse mailops, [])                                                                                        #
            where
                fun gather ([],                               results) =>  BASE_MAILOPS results;                                # Done, return results.
                    #
                    gather (BASE_MAILOPS []       ! rest,  results) =>  gather (rest,           results);
                    gather (BASE_MAILOPS [mailop] ! rest,  results) =>  gather (rest, mailop  ! results);
                    gather (BASE_MAILOPS  mailops ! rest,  results) =>  gather (rest, mailops @ results);
                    #
                    gather (mailops, []) =>  gather' (mailops, []);
                    gather (mailops, l ) =>  gather' (mailops, [BASE_MAILOPS l]);
                end 

                also
                fun gather' ([], [mailop]) =>  mailop;
                    gather' ([], mailops)  =>  CHOOSE_MAILOP mailops;
                    #
                    gather'  (CHOOSE_MAILOP mailops ! rest,  mailops')
                        =>
                        gather' (rest, mailops @ mailops');

                    gather' (BASE_MAILOPS base_mailops ! rest, BASE_MAILOPS base_mailops' ! rest')
                        =>
                        gather' (rest, BASE_MAILOPS (base_mailops @ base_mailops') ! rest');

                    gather' (mailop ! rest, mailops')
                        =>
                        gather' (rest, mailop ! mailops');
                end;
            end;

        fun make_compound_mailop (mailop, added_action)
            =
            # Here we implement the "==>" op used in do_one_mailop [...] rules.
            # This op takes two arguments
            #
            #     mailop:        Mailop(X)
            #     added_action:  X -> Y
            #
            # and from them constructs a new mailop of type
            #
            #                    Mailop(Y)
            #
            # which does exactly what the original mailop did,
            # except that afterwards it also does added_action.
            #
            # Recall that (suppressing a few details) a (base) Mailop                   # See   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
            # is essentially a function                                 
            #
            #      Void -> ( MAILOP_IS_READY_TO_FIRE { fire_mailop: Void -> X }
            #              | MAILOP_IS_NOT_READY_TO_FIRE ({...} -> X)
            #              )
            #
            # The fn that actually does the useful work here is
            #
            #   fire_mailop
            #
            # -- everything else is just bookkeeping etc -- and
            # our job here is basically to replace it by
            #
            #    added_action o fire_mailop
            #
            # Everything else in this fn is just the busywork of
            # iterating over the expression:
            #
            wrap' mailop
            where
                fun wrap_base_mailop  is_mailop_ready_to_fire  ()                       # Note that a mailop *is* an is_mailop_ready_to_fire fn. We hide that externally as an implementation detail, but it becomes visible at this level.
                    =                                                                   # Note also that 'wrap_base_mailop' is CURRIED -- our caller does not immediate supply our () arg, so we *initially* return a thunk that will
                    case (is_mailop_ready_to_fire ())                                   # *eventually* evaluate is_mailop_ready_to_fire() -- we do not do so initially.  The returned thunk is a new mailop wrapping the old mailop.
                        #
                        MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }       =>  MAILOP_IS_READY_TO_FIRE { priority, fire_mailop => added_action o fire_mailop };    # The new fire_mailop value here is what this fn is all about.
                        MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch    =>  MAILOP_IS_NOT_READY_TO_FIRE (added_action o set_up_mailopready_watch);              # Same as above in slightly different setting.
                    esac;
                #
                fun wrap' (BASE_MAILOPS base_mailops)           =>  BASE_MAILOPS (map  wrap_base_mailop  base_mailops);                                                 # Iterate through the base mailops doing the above to them.
                    #
                    wrap' (CHOOSE_MAILOP mailops)               =>  CHOOSE_MAILOP            (map wrap' mailops);                                                       # Iterate through the compound mailops looking for work.
                    #
                    wrap' (DYNAMIC_MAILOP make_mailop)          =>  DYNAMIC_MAILOP           (fn ()     =  make_compound_mailop (make_mailop(), added_action));         # Same core substitution in setting of dynamic mailops.
                    wrap' (DYNAMIC_MAILOP_WITH_NACK f)          =>  DYNAMIC_MAILOP_WITH_NACK (fn mailop =  make_compound_mailop (f mailop,      added_action));         # Same core substitution in setting of dynamic mailops with nacks.
                end;
            end;

        (==>) =  make_compound_mailop;                  # Infix synonym for readability.
        #
        fun make_exception_handling_mailop (mailop, exception_handler_fn)
            =
            wrap' mailop
            where
                fun wrap f x
                    =
                    f x
                    except
                        exn = exception_handler_fn exn;
                #
                fun wrap_base_mailop  is_mailop_ready_to_fire ()
                    =
                    case (is_mailop_ready_to_fire ())
                        #
                        MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }       =>  MAILOP_IS_READY_TO_FIRE { priority, fire_mailop => wrap fire_mailop };
                        MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch    =>  MAILOP_IS_NOT_READY_TO_FIRE (wrap set_up_mailopready_watch);
                    esac;
                #
                fun wrap' (BASE_MAILOPS  base_mailops)  =>  BASE_MAILOPS  (map  wrap_base_mailop  base_mailops);
                    #
                    wrap' (CHOOSE_MAILOP mailops)       =>  CHOOSE_MAILOP (map  wrap'  mailops);
                    #
                    wrap' (DYNAMIC_MAILOP make_mailop)  =>  DYNAMIC_MAILOP           (fn ()     =  make_exception_handling_mailop (make_mailop(), exception_handler_fn));
                    wrap' (DYNAMIC_MAILOP_WITH_NACK f)  =>  DYNAMIC_MAILOP_WITH_NACK (fn mailop =  make_exception_handling_mailop (f mailop,      exception_handler_fn));
                end;
            end;

        Prepared_Mailops(X)
          = NACKFREE_MAILOPS  List( Base_Mailop(X) )
          | NACKFULL_MAILOPS  List( Prepared_Mailops(X) )
          | WITHNACK_MAILOP  (itt::Condition_Variable, Prepared_Mailops(X))
          ;

    /* +DEBUG
    fun sayGroup (msg, eg) = let
          fun f (NACKFREE_MAILOPS l, sl) = "NACKFREE_MAILOPS(" ! int::to_string (list::length l) ::()) ! sl
            | f (NACKFULL_MAILOPS l, sl) = "NACKFULL_MAILOPS(" ! g (l, ")" ! sl)
            | f (WITHNACK_MAILOP  l, sl) = "WITHNACK_MAILOP(" ! f(#2 l, ")" ! sl)
          also g ([], sl) = sl
            | g ([x], sl) = f (x, sl)
            | g (x ! r, sl) = f (x, ", " ! g (r, sl))
          in
            Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
          end
    -DEBUG*/

        # Prepare mailop expression to run.
        # In particular, this evaluates all
        # dynamic rules to generate the actual
        # final rules to do_one_mailop between:
        #
        fun prepare (BASE_MAILOPS l)
                =>
                NACKFREE_MAILOPS l;

            prepare mailop
                =>
                prepare' mailop
                where
                    fun prepare' (DYNAMIC_MAILOP  make_mailop)
                            =>
                            prepare' (make_mailop ());

                        prepare' (DYNAMIC_MAILOP_WITH_NACK f)
                            =>
                            {   condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
                                #
                                WITHNACK_MAILOP  (condvar,  prepare' (f (wait_on_condvar'  condvar)));
                            };

                        prepare' (BASE_MAILOPS  mailops)
                            =>
                            NACKFREE_MAILOPS  mailops;

                        prepare' (CHOOSE_MAILOP  mailops)
                            =>
                            prepare_mailops (mailops, [])                                                               # Optimistically assume nackfree; we'll fall back toprepare_nackfull_mailops() routine if we're wrong.
                            where
                                fun prepare_mailops ([], mailops)
                                        =>
                                        NACKFREE_MAILOPS mailops;

                                    prepare_mailops (mailop ! rest, mailops')
                                        =>
                                        case (prepare' mailop)
                                            #
                                            NACKFREE_MAILOPS mailops =>  prepare_mailops          (rest,  mailops @                   mailops' );
                                            NACKFULL_MAILOPS mailops =>  prepare_nackfull_mailops (rest,  mailops @ [NACKFREE_MAILOPS mailops']);
                                            /*        */     mailops =>  prepare_nackfull_mailops (rest, [mailops,   NACKFREE_MAILOPS mailops']);
                                        esac;
                                end

                                also
                                fun prepare_nackfull_mailops ([], [group]) =>  group;                                           # The general case vs prepare_mailops above handling the nice simple (and common) case.
                                    prepare_nackfull_mailops ([], l)       =>  NACKFULL_MAILOPS l;
                                    #
                                    prepare_nackfull_mailops (mailop ! rest, l)
                                        =>
                                        case (prepare' mailop, l)
                                            #                                 
                                            (NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
                                                =>
                                                prepare_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');

                                            (NACKFULL_MAILOPS mailops, l) =>   prepare_nackfull_mailops (rest, mailops @ l);
                                            (             mailops, l) =>   prepare_nackfull_mailops (rest, mailops ! l);                # Here 'mailops' can be NACKFREE_MAILOPS or WITHNACK_MAILOP.
                                       esac;
        end;    end;  end;  end;end;


        stipulate
            #
            count =  REF 0;                                     # Runs circularly around range 0..999,999
            #
            fun pick_fairly i                                   # The point here is just to pick fairly among 'i' alternative ready mailops,
                =                                               # so that we don't consistently pass over any one ready mailop.
                {   j =  *count;
                    #
                    if (j == 1000000)   count := 0;
                    else                count := j+1;
                    fi;

                    int::rem (j, i);
                };
        herein
            #
            fun pick_highest_priority_mailop_breaking_ties_fairly ([(_, fire_mailop)], _)
                    =>
                    fire_mailop;                                                                                                        # Only one choice -- easy work!

                pick_highest_priority_mailop_breaking_ties_fairly (l, n)
                    =>
                    max (l, 0, 0, [])
                    where
                        #
                        fun priority -1 =>  n;
                            priority  p =>  p;
                        end;
                        #
                        fun max ((p, fire_mailop) ! rest,  max_p,  k,  fire_mailop_fns)
                                =>
                                {   p =  priority p;
                                    #
                                    if    (p >  max_p)   max (rest,     p, 1,  [fire_mailop]                 );                         # mailop is higher priority than any other yet seen -- discard other candidates.
                                    elif  (p == max_p)   max (rest, max_p, k+1, fire_mailop ! fire_mailop_fns);                         # mailop is equal-highest priority yet encountered -- add it to list of candidates.
                                    else                 max (rest, max_p, k,                 fire_mailop_fns);                         # mailop is low-priority, ignore it.
                                    fi;
                                };

                            max ([], _, k, [fire_mailop])
                                =>
                                fire_mailop;                                                                                            # Scanned all mailops and we have a unique highest-priority mailop, so choose it.

                            max ([], _, k, fire_mailop_fns)
                                =>
                                list::nth (fire_mailop_fns, pick_fairly k);                                                             # Scanned all mailops and we have several highest-priority mailops, so break tie fairly.
                        end;
                    end;
             end;
        end;
        #
        fun make__do1mailoprun_status__and__finish_do1mailoprun ()
            =
            {   do1mailoprun_status =    REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));
                #
                finish_do1mailoprun    =    .{   do1mailoprun_status :=  itt::DO1MAILOPRUN_IS_COMPLETE;  };

                { do1mailoprun_status, finish_do1mailoprun };
            };


        stipulate
            # When we have exactly one mailop in the do_one_mailop[...] we can use simple logic:
            #
            fun do_one_mailop  (is_mailop_ready_to_fire:  Base_Mailop(X))
                =
                {
                    log::uninterruptible_scope_mutex := 1;
                    #
                    case (is_mailop_ready_to_fire ())
                        #             
                        MAILOP_IS_READY_TO_FIRE { fire_mailop, ... }                                                                    # Reppy refers to 'fire_mailop' as 'doFn'.
                            =>
                            fire_mailop ();

                        MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch
                            =>
                            {   (make__do1mailoprun_status__and__finish_do1mailoprun ())
                                    ->
                                    { do1mailoprun_status, finish_do1mailoprun };
                                #
                                set_up_mailopready_watch
                                  { do1mailoprun_status,
                                    finish_do1mailoprun,
                                    return_to__set_up_mailopready_watches__loop
                                        =>
                                        mps::dispatch_next_thread__usend__noreturn                                              # Since we have only one mailop, we do not need to actually loop here.
                                  };
                            };
                    esac;
                };

            Test_Mailops_For_Readiness_To_Fire__Result(X)
                #
                = NO_READY_MAILOPS { start_mailop_watch__fns:   List (itt::Set_Up_Mailopready_Watch__Fn(X)) }
                |    READY_MAILOPS { fire_mailop_fns:           List ((Int, (Void -> X))),      n: Int }
                ;


        herein

            # This function handles the case of picking
            # and firing one of a list of mailops thunks
            # (w/o any negative acknowledgements).
            #
            # It also handles NEVER.
            #
            fun do_nackfree_mailops []       =>  mps::dispatch_next_thread__noreturn ();                                        # 'do_one_mailop' with empty rule list -- nothing to do.
                do_nackfree_mailops [mailop] =>  do_one_mailop  mailop;                                                         # This is the only call to   do_one_mailop().
                #
                do_nackfree_mailops  mailops
                    =>
                    {
                        log::uninterruptible_scope_mutex := 1;                                                                  # Start uninterruptible scope. (Aka "critical section", "atomic region" etc.)
                        #
                        case (test_mailops_for_readiness_to_fire (mailops, []))
                            #
                            READY_MAILOPS { fire_mailop_fns, n }
                                =>
                                {   fire_mailop =   pick_highest_priority_mailop_breaking_ties_fairly  (fire_mailop_fns, n);    # Pick a do_one_mailop[....] mailop to fire ...
                                    #
                                    fire_mailop ();                                                                             # ... and then fire it.
                                };

                            NO_READY_MAILOPS { start_mailop_watch__fns }
                                =>
                                call_with_current_control_fate
                                    #
                                    (fn fate
                                        =
                                        {   set_up_mailopready_watches__loop  start_mailop_watch__fns;
                                            #   
                                            error "[set_up_mailopready_watches__loop]";                                         # Above should never return.
                                        }
                                        where
                                            switch_to_control_fate =  switch_to_control_fate  fate;
                                            #
                                            (make__do1mailoprun_status__and__finish_do1mailoprun ())
                                                ->
                                                { do1mailoprun_status, finish_do1mailoprun };
                                            #
                                            fun set_up_mailopready_watches__loop [] =>   mps::dispatch_next_thread__usend__noreturn ();
                                                #
                                                set_up_mailopready_watches__loop (set_up_mailopready_watch ! rest)
                                                    =>
                                                    switch_to_control_fate
                                                        #
                                                        (set_up_mailopready_watch
                                                          { do1mailoprun_status,
                                                            finish_do1mailoprun,
                                                            return_to__set_up_mailopready_watches__loop                         # maildrop.pkg, mailslot.pkg etc call this to return control to us.
                                                                =>
                                                                fn () =  set_up_mailopready_watches__loop rest
                                                          }
                                                        );
                                            end;
                                        end
                                    );
                            
                        esac;
                    }
                    where
                        #
                        fun test_mailops_for_readiness_to_fire (is_mailop_ready_to_fire ! rest,  start_mailop_watch__fns)                               # In this loop we have not yet found any mailops ready to fire.
                                =>
                                case (is_mailop_ready_to_fire ())
                                    #
                                    MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch        =>  test_mailops_for_readiness_to_fire   (rest, set_up_mailopready_watch ! start_mailop_watch__fns);
                                    MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }           =>  test_mailops_for_readiness_to_fire'  (rest, [(priority, fire_mailop)], 1);
                                esac;

                            test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns)                                                            # Done -- no ready-to-fire mailops found and no candidates left to check.
                                =>
                                NO_READY_MAILOPS { start_mailop_watch__fns };                                                                           # No mailops were ready to fire; return list of fns which
                        end                                                                                                                             # each start a watch on one mailop for readiness to fire.

                        also
                        fun test_mailops_for_readiness_to_fire' (is_mailop_ready_to_fire ! rest,  fire_mailop_fns,  n)                                  # In this loop we have found at least one mailop which is ready to fire.
                                =>
                                case (is_mailop_ready_to_fire ())
                                    #
                                    MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } =>  test_mailops_for_readiness_to_fire' (rest, (priority, fire_mailop) ! fire_mailop_fns, n+1);
                                    _                                                 =>  test_mailops_for_readiness_to_fire' (rest,                           fire_mailop_fns, n  );
                                esac;

                            test_mailops_for_readiness_to_fire' ([],  fire_mailop_fns,  n)
                                =>
                                 READY_MAILOPS { fire_mailop_fns, n };                                                                                  # At least one mailop was ready to fire; return the fns which
                                                                                                                                                        # will fire the ready-to-fire mailops.

                        end;                                                                                                                            # fun do_ready_mailops

                                                                                                                                                        # NOTE: Maybe we should just keep
                                                                                                                                                        #       track of the max priority above?
                                                                                                                                                        #       What about fairness to fixed
                                                                                                                                                        #       priority mailops (e::g., always, timeout?)

                    end;                                                                                                                                # where
            end;                                                                                                                                        # fun do_nackfree_mailops
        end;                                                                                                                                            # stipulate

        stipulate
            # Walk the mailop group tree,
            # collecting the base mailops
            # (with associated ack flags),
            # also a list of nackstates.
            #
            # A nackstate is a
            #     (Condvar, List(Flag(Ack)))
            # pair, where the flags are
            # those associated with the
            # mailops covered by the nack
            # condvar.
            #
            fun gather_base_mailops_and_nackstates  mailops
                =
                case mailops
                    #
                    NACKFULL_MAILOPS _
                        =>
                        gather_mailops (mailops, [], [])
                        where 
                            un_wrapped_flag =  REF FALSE;
                            #
                            fun reverse_and_prepend_mailops  (mailop ! rest,  results) =>  reverse_and_prepend_mailops  (rest,  (mailop, un_wrapped_flag) ! results);
                                reverse_and_prepend_mailops  (           [],  results) =>  results;
                            end;
                            #
                            fun gather_mailops (NACKFREE_MAILOPS nackfree_mailops, bl, nackstates)
                                    =>
                                    (reverse_and_prepend_mailops (nackfree_mailops, bl), nackstates);

                                gather_mailops (NACKFULL_MAILOPS nackfull_mailops, bl, nackstates)
                                    =>
                                    fold_forward  f  (bl, nackstates)  nackfull_mailops
                                    where
                                        fun f (group', (bl', nackstates'))
                                            =
                                            gather_mailops (group', bl', nackstates');
                                    end;

                                gather_mailops (withnack as WITHNACK_MAILOP _, bl, nackstates)
                                    =>
                                    gather_wrapped (withnack, bl, nackstates);
                            end;
                        end;

                    group =>   gather_wrapped (group, [], []);
                esac
                where
                    fun gather_wrapped (group, bl, nackstates)
                            =
                            {    (gather (group, bl, [], nackstates))
                                    ->
                                    (bl, _, nackstates);

                                (bl, nackstates);
                            }
                            where
                                fun gather (NACKFREE_MAILOPS mailops, bl, all_flags, nackstates)
                                        =>
                                        {   (reverse_and_prepend_mailops (mailops, bl, all_flags))
                                                ->
                                                (bl', all_flags');

                                            (bl', all_flags', nackstates);
                                        }
                                        where
                                            fun reverse_and_prepend_mailops ([],  bl,  all_flags)
                                                    =>
                                                    (bl, all_flags);

                                                reverse_and_prepend_mailops (mailop ! rest,  bl,  all_flags)
                                                    =>
                                                    {   flag = REF FALSE;
                                                        #
                                                        reverse_and_prepend_mailops  (rest,  (mailop, flag) ! bl,  flag ! all_flags);
                                                    };
                                            end;
                                        end;


                                    gather (NACKFULL_MAILOPS group, bl, all_flags, nackstates)
                                        =>
                                        fold_forward  f  (bl, all_flags, nackstates)  group
                                        where
                                            fun f (group', (bl', all_flags', nackstates'))
                                                =
                                                gather (group', bl', all_flags', nackstates');
                                        end;

                                    gather (WITHNACK_MAILOP (condvar, group), bl, all_flags, nackstates)
                                        =>
                                        {   (gather (group, bl, [], nackstates))
                                                ->
                                                (bl', all_flags', nackstates');

                                            ( bl',
                                              all_flags' @ all_flags,
                                              (condvar, all_flags')  !  nackstates'
                                            );
                                        };
                                end;                                                                    # fun gather
                            end;                                                                        # where
                end;                                                                                    # where

            Test_Mailops_For_Readiness_To_Fire__Result(X)
                #
                = NO_READY_MAILOPS { start_mailop_watch__fns:   List ((itt::Set_Up_Mailopready_Watch__Fn(X), Ref(Bool))) }
                |    READY_MAILOPS { fire_mailop_fns:           List ((Int, ((Void -> X), Ref(Bool)))), n: Int }
                ;

        herein
            # This function handles the more
            # complicated case of running a
            # mailop expression where negative
            # acknowledgements are involved.
            #
            fun do_nackfull_mailops  group
                =
                {
                    log::uninterruptible_scope_mutex := 1;                                                      # Start uninterruptible scope (aka "critical section" aka "atomic region"...)
                    #
                    case (test_mailops_for_readiness_to_fire (mailops, []))
                        #
                        READY_MAILOPS { fire_mailop_fns,  n }
                            =>
                            {   (pick_highest_priority_mailop_breaking_ties_fairly (fire_mailop_fns, n))
                                    ->
                                    (fire_mailop, flag);

                                flag := TRUE;

                                send_nacks_as_appropriate ();

                                fire_mailop ();
                            };

                        NO_READY_MAILOPS { start_mailop_watch__fns }
                            =>
                            call_with_current_control_fate
                                (fn fate
                                    =
                                    {   set_up_mailopready_watches__loop  start_mailop_watch__fns;                                              # Ask each mailop in the do_one_mailop [...] list to enqueue itself as appropriate. 
                                        #
                                        error "[set_up_mailopready_watches__loop]";                                                             # This cannot happen -- set_up_mailopready_watches__loop() runs next thread when done.
                                    }
                                    where
                                        switch_to_control_fate =  switch_to_control_fate  fate;
                                        #
                                        do1mailoprun_status =   REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread ()));                # 'do1mailoprun_status' is basically a mutex ensuring exactly one mailop fires per do1mailoprun.
                                        #
                                        fun finish_do1mailoprun_fn  flag  ()                                                                    # This will be called by the first mailop to fire. 
                                            =
                                            {   do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
                                                flag := TRUE;
                                                send_nacks_as_appropriate ();
                                            };
                                        #
                                        fun set_up_mailopready_watches__loop [] =>   mps::dispatch_next_thread__usend__noreturn ();
                                            #
                                            set_up_mailopready_watches__loop ((set_up_mailopready_watch, flag)  !  rest)
                                                =>
                                                switch_to_control_fate
                                                    (
                                                        set_up_mailopready_watch
                                                          {
                                                            do1mailoprun_status,
                                                            finish_do1mailoprun =>  finish_do1mailoprun_fn flag,
                                                            return_to__set_up_mailopready_watches__loop                                         # maildrop.pkg, mailslot.pkg etc call this to return control to us.
                                                                =>
                                                                fn () =  set_up_mailopready_watches__loop rest
                                                          }
                                                    );
                                        end;                                                                                                    # fn set_up_mailopready_watches__loop
                                    end                                                                                                         # where
                                );

                    esac;       
                }
                where
                    (gather_base_mailops_and_nackstates  group)
                        ->
                        (mailops, nackstates);
                    #
                    fun send_nacks_as_appropriate ()
                        =
                        apply  check_condvar  nackstates                                                # NB: We capture 'nackstates' here for later use in 'finish_do1mailoprun_fn'.
                        where
                            # check_condvar checks the flags of a nackstate.
                            # If they are all FALSE then the
                            # corresponding condvar is set to signal
                            # the negative ack.
                            #
                            fun check_condvar (condvar, flags)
                                =
                                check_flags flags
                                where
                                    fun check_flags ((REF TRUE) ! _) =>  ();
                                        check_flags (_ ! rest)       =>  check_flags  rest;
                                        check_flags []               =>  set_condvar__iu  condvar;
                                    end;
                                end;
                        end;
                    #
                    fun test_mailops_for_readiness_to_fire ((is_mailop_ready_to_fire, flag) ! rest,   start_mailop_watch__fns)                  # In this loop we have not yet found a mailop ready to fire.
                            =>
                            case (is_mailop_ready_to_fire ())
                                #
                                MAILOP_IS_READY_TO_FIRE      { priority, fire_mailop }  =>  test_mailops_for_readiness_to_fire' (rest, [(priority, (fire_mailop, flag))], 1);
                                MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch   =>  test_mailops_for_readiness_to_fire  (rest, (set_up_mailopready_watch, flag) ! start_mailop_watch__fns);
                            esac;

                        test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns)                                                        # Done -- no ready-to-fire mailops found and none left to check,
                            =>                                                                                                                  # so we must block until some mailop becomes ready to fire.
                            NO_READY_MAILOPS { start_mailop_watch__fns };
                    end

                    also
                    fun test_mailops_for_readiness_to_fire' ((is_mailop_ready_to_fire, flag) ! rest, fire_mailop_fns, n)                        # In this loop we have found at least one do_one_mailop[...] mailop ready to fire.
                            =>
                            case (is_mailop_ready_to_fire ())
                                #
                                MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }
                                    =>
                                    test_mailops_for_readiness_to_fire'  (rest,  (priority,  (fire_mailop, flag)) ! fire_mailop_fns,  n+1);

                                _   =>   test_mailops_for_readiness_to_fire'  (rest,  fire_mailop_fns,  n);
                            esac;

                        test_mailops_for_readiness_to_fire' ([], fire_mailop_fns, n)                                                            # Done -- pick one do_one_mailop[...] mailop to fire and then fire it.
                            =>
                            READY_MAILOPS { fire_mailop_fns,  n };
                    end;
                                                                                                                        # NOTE: Maybe above we should just
                                                                                                                        # keep track of the max priority?
                                                                                                                        # What about fairness to fixed
                                                                                                                        # priority mailops (e::g., always, timeout?)
                                                                                                                        #

                end;                                                                                                    # fun do_nackfull_mailops
        end;                                                                                                            # stipulate


        #
        fun block_until_mailop_fires  mailop                                                                                            # External entrypoint.
            =
            case (prepare mailop)
                #
                NACKFREE_MAILOPS mailops =>  do_nackfree_mailops  mailops;
                /*            */ mailops =>  do_nackfull_mailops  mailops;
            esac;


                                                                                                                        # 'do_one_mailop' is our core entrypoint, the 'do_one_mailop' used by clients
                                                                                                                        # to do style handle-multiple-mail-sources thread I/O via
                                                                                                                        # statements looking like
                                                                                                                        #
                                                                                                                        #     do_one_mailop [
                                                                                                                        #         foo' ==> .{ do_this (); },
                                                                                                                        #         bar' ==> .{ do_that (); },
                                                                                                                        #         ...
                                                                                                                        #     ];
                                                                                                                        #
                                                                                                                        # We have two main cases:
                                                                                                                        #  1) If one or more mailops in the list is ready to fire, we pick one and fire it.
                                                                                                                        #  2) If no mailop in the list is ready to fire, we must block until one becomes ready, then continue as in 1).
        fun do_one_mailop  mailops
            =
            case (prepare_mailops (mailops, []))
                #
                NACKFREE_MAILOPS mailops  =>   do_nackfree_mailops  mailops;                                            # This is special-case handling for simple special case of no WITH_NACK mailops.
                /*            */ mailops  =>   do_nackfull_mailops  mailops;                                            # This is the general case.
            esac
            where
                # Preparation.  During this phase we need to:
                #
                #  o Expand DYNAMIC_MAILOP                                                                              # These are essentially hacks to allow generating
                #    and    DYNAMIC_MAILOP_WITH_NACK clauses.                                                           # mailops on the fly while 'do_one_mailop' is running.
                #
                #  o Figure out whether we have any WITH_NACK clauses,
                #    which complicate things.
                #
                #  In the common case of no WITH_NACK mailops
                #  we return NACKFREE_MAILOPS _, otherwise    
                #  we return NACKFULL_MAILOPS _ or WITHNACK_MAILOP _.
                # 
                fun prepare_mailops  (mailop ! rest,  mailops')                                                         # prepare_mailops          handles the nice simple nack-free case;
                        =>                                                                                              # prepare_nackfull_mailops handles the case where one or more WITH_NACK clauses are present.
                        case (prepare_one_mailop  mailop)
                            #
                            /* */ NACKFREE_MAILOPS mailops  =>  prepare_mailops           (rest,  mailops @                   mailops' );
                            /* */ NACKFULL_MAILOPS mailops  =>  prepare_nackfull_mailops  (rest,  mailops @ [NACKFREE_MAILOPS mailops']);
                            wm as WITHNACK_MAILOP   _       =>  prepare_nackfull_mailops  (rest, [wm,        NACKFREE_MAILOPS mailops']);
                        esac;

                    prepare_mailops ([], mailops)           =>  NACKFREE_MAILOPS  mailops;                              # Done!
                end

                also
                fun prepare_nackfull_mailops ([], [result]) =>  result;
                    prepare_nackfull_mailops ([], results)  =>  NACKFULL_MAILOPS results;
                    #
                    prepare_nackfull_mailops (mailop ! rest,  results)
                        =>
                        case (prepare_one_mailop  mailop,  results)
                            #
                            (NACKFREE_MAILOPS mailops,  NACKFREE_MAILOPS mailops' ! rest')
                                =>
                                prepare_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');

                            (NACKFULL_MAILOPS mailops, results) =>   prepare_nackfull_mailops  (rest,  mailops @ results);
                            (other,                    results) =>   prepare_nackfull_mailops  (rest,    other ! results);              # 'other' can be (NACKFREE_MAILOPS _) or (WITHNACK_MAILOP _).
                        esac;
                end

                also
                fun prepare_one_mailop (BASE_MAILOPS    mailops)     =>   NACKFREE_MAILOPS mailops;
                    prepare_one_mailop (CHOOSE_MAILOP   mailops)     =>   prepare_mailops (mailops, []);
                    prepare_one_mailop (DYNAMIC_MAILOP  make_mailop) =>   prepare_one_mailop (make_mailop ());                          # Generate a do_one_mailop[...] mailop on the fly, then recursively prepare it.
                    #
                    prepare_one_mailop (DYNAMIC_MAILOP_WITH_NACK make_mailop)                                                           # Like as DYNAMIC_MAILOP but with a nack mailop to signal client-code abort.
                        =>
                        {   condvar =  itt::CONDITION_VARIABLE  (REF  (itt::CONDVAR_IS_NOT_SET []));
                            #
                            WITHNACK_MAILOP  (condvar,  prepare_one_mailop  (make_mailop  (wait_on_condvar'  condvar)));
                        };

                end;
            end;                        # fun do_one_mailop
    };                                  # package mailop
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext