PreviousUpNext

15.4.928  src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.pkg

## oneshot-maildrop.pkg
#
# The implementation set-once maildrops.

# Compiled by:
#     src/lib/std/standard.lib






###          "We're fools whether we dance or not,
###           so we might as well dance."
###
###                   -- Japanese proverb



stipulate
    package fat =  fate;                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package rwq =  rw_queue;                            # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package mps =  microthread_preemptive_scheduler;    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    Fate(X) =   fat::Fate(X);
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package   oneshot_maildrop
    :         Oneshot_Maildrop                          # Oneshot_Maildrop                      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.api
    {
        # We use the same underlying representation
        # for both oneshots and maildrops:
        #
        Oneshot_Maildrop(X)
            =
            ONESHOT   { read_q:    rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
                        value:     Ref(  Null_Or(X) )
                      };



        exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
        #
        fun make_cell ()
            =
            ONESHOT { value    => REF NULL,
                      read_q   => rwq::make_rw_queue ()
                    };
        #
        fun same_cell (ONESHOT { value=>v1, ... }, ONESHOT { value=>v2, ... } )
            =
            v1 == v2;

        #
        fun make_transaction_id ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));

        #
        fun mark_do1mailoprun_complete_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    thread_id;
                };

            mark_do1mailoprun_complete_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception DIE "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                        # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;


        Qy_Item(X)
          = NO_ITEM
          | ITEM  ((Ref(itt::Do1mailoprun_Status), Fate(X)) )
          ;                                                                                                                     # ITEM should probably host a record not a tuple. XXX SUCKO FIXME.

        # Functions to clean channel input and output queues 
        #
        stipulate
            #
            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _)  !  rest)
                    =>
                    clean rest;

                clean l => l;
            end;

            #
            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  l)
                    =>
                    clean_rev (rest,  l);

                clean_rev (x ! rest,  l)
                    =>
                    clean_rev (rest,  x ! l);
            end;

        herein
            #

            #
            fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
                =
                clean_front  *front
                where
                    fun clean_front [] =>   clean_back *back;
                        #
                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] =>               clean_back  *back;

                                (item ! rest) =>    {   front :=  rest;
                                                        #
                                                        ITEM  item;
                                                    };
                            esac;

                    end

                    also
                    fun clean_back [] =>   NO_ITEM;
                        #
                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    [] =>  NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front :=  rest;
                                            #
                                            ITEM  item;
                                        };
                                esac;
                            };
                    end;
                end;
            #
            fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
                =
                clean_front  *front
                where
                    fun clean_front [] =>    clean_back  *back;
                        #
                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] =>   clean_back  *back;

                                f' =>   {   front :=  f';
                                            #
                                            back :=  item ! *back;
                                        };
                            esac;
                    end

                    also
                    fun clean_back [] =>   front :=  [ item ];
                        #
                        clean_back r
                            =>
                            case (clean_rev (r, []))
                                #
                                [] => {  front := [item];  back  := []; };
                                rr => {  back  := [item];  front := rr; };
                            esac;
                    end;
                end;
        end;                                    # stipulate


        # When a thread is resumed after being blocked
        # on an oneshot get() op there may be other threads
        # also blocked on the oneshot.
        #
        # This function is used to propagate the message
        # to all of the threads that are blocked on the
        # variable.
        #
        # It must be called from an uninterruptible scope.
        # When the read_q is finally empty we end
        # the uninterruptible scope.
        #
        # We must use "clean_and_remove" to get items
        # from the read_q in the unlikely event that
        # a single thread executes a choice of
        # multiple gets on the same oneshot.
        #
        fun wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, v)
            =
            case (clean_and_remove read_q)
                #
                NO_ITEM =>   log::uninterruptible_scope_mutex := 0;

                ITEM (do1mailoprun_status, get_v)
                    =>
                    call_with_current_fate
                        (\\ old_fate
                            =
                            {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => mark_do1mailoprun_complete_and_return_thread  do1mailoprun_status,   old_fate };
                                #
                                switch_to_fate  get_v  v;                                               # 
                            }
                        );
            esac;


        # I-variables
        #
        make_oneshot_maildrop =  make_cell;
        same_oneshot_maildrop =  same_cell;
        #
        fun put_in_oneshot (ONESHOT { read_q, value }, v)
            =
            {
                                                                                        mps::assert_not_in_uninterruptible_scope "put_in_oneshot";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL => {
                                value := THE v;
                                #
                                case (clean_and_remove  read_q)
                                    #
                                    NO_ITEM =>
                                        {
                                            log::uninterruptible_scope_mutex := 0;
                                        };
                                    #
                                    ITEM (do1mailoprun_status, get_v)
                                        =>
                                        call_with_current_fate
                                            (
                                             \\ old_fate
                                                =
                                                {
                                                    mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => mark_do1mailoprun_complete_and_return_thread   do1mailoprun_status,   old_fate };
                                                    #
                                                    switch_to_fate  get_v  v;                                                           # 
                                                }
                                            );
                                esac;
                            };

                    THE _ =>
                        {
                            log::uninterruptible_scope_mutex := 0;
                            #
                            raise exception  MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
                        };
                esac;
            };
        #
        fun get_from_oneshot (ONESHOT { read_q, value } )
            =
            {
                                                                                                                mps::assert_not_in_uninterruptible_scope "get_from_oneshot";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #         
                    NULL =>     {
                                    msg =   call_with_current_fate
                                                (\\ fate
                                                    =
                                                    {   
                                                        rwq::put_on_back_of_queue (read_q, (make_transaction_id(), fate));
                                                        #
                                                        mps::dispatch_next_thread__xu__noreturn ();
                                                    }
                                                );

                                    wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, msg);

                                    msg;
                                };

                    THE v =>    {
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };
               esac;
            };
        #
        fun get_from_oneshot' (ONESHOT { read_q, value } )
            =
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]
            where
                fun suspend_then_eventually_fire_mailop                                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                                   # After setting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {
                        (call_with_current_fate
                            (\\ get_v
                                =
                                {   rwq::put_on_back_of_queue
                                      ( read_q,
                                        (do1mailoprun_status,  get_v)
                                      );

                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # Return control to mailop.pgk
                                                                                                                raise exception DIE "maildrop: impossible";             # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            -> v;                                                                               # Execution will pick up here when 'get_v(v)' is eventually called.
                                                                                                                # This will happen when set() (above) eventually does:   switch_to_fate  get_v  v;

                        finish_do1mailoprun ();                                                                 # Keep any other mailops in the current select[...] from executing.

                        wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, v);    # 

                        v;
                    };
                #
                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    case *value
                        #
                        NULL  =>    itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                        #
                        THE v =>    itt::READY_MAILOP
                                      {
                                        fire_mailop => {.   log::uninterruptible_scope_mutex := 0;              # Reppy refers to 'fire_mailop' as 'doFn'.
                                                            v;
                                                        }

                                      };
                   esac;
            end;
        #
        fun nonblocking_get_from_oneshot (ONESHOT { read_q, value } )
            =
            {
                                                                                                                mps::assert_not_in_uninterruptible_scope "nonblocking_get_from_oneshot";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {   log::uninterruptible_scope_mutex := 0;
                                    #
                                    THE v;
                                };

                    NULL  =>  NULL;
                esac;
            };

    };                                          # package maildrop1 
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext