PreviousUpNext

15.4.905  src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.pkg

## mailslot.pkg                                         # Derives from   cml/src/core-cml/channel.sml
#
# Mailslots implement synchronous message sends -- both
# threads must be ready before either can proceed.
#
# Despite being the most basic mail exchange mechanism,
# mailslots have the most intricate implementation,
# precisely because they involve a two-thread RENDEZVOUS
# rather than just two threads doing different things
# at different times.
#
# The rendezvous aspect means that the implementation
# is crucially about two threads doing something
# "at the same time".  (By the time you really understand
# this package, you will have a very good grasp of using
# call_with_current_fate ("call/cc") in a practical setting!)
#
# In practice, in our current timeslicing implementation,
# we never have two microthreads running at the same time,
# so what has to actually happen is that the first thread
# to arrive at the mailslot blocks and gets put on one of
# its wait queues;  when the second thread arrives the
# message transfer takes place and both threads are then
# free to proceed.  In practice one will exit actually running,
# while the other will be moved from the mailslot wait queue
# to the microthread_preemptive_scheduler run queue.
#
#
#     "To ensure that we always leave the atomic region exactly once, we
#      require that the blocking operation be responsible for leaving the
#      atomic region (in the mailop case, it must also execute the clean-up
#      action).  The fire_mailop fn always transfers control to the blocked thread
#      without leaving the atomic region.  Note that the give (and give')
#      set_up_mailopready_watch()s run using the receiver's thread state."
#                                       -- John H Reppy

# Compiled by:
#     src/lib/std/standard.lib





###                             "Two men enter -- one man leaves."
###
###                                     -- Max Max: Beyond Thunderdome




stipulate
    package fat =  fate;                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package mop =  mailop;                              # mailop                                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
    package rwq =  rw_queue;                            # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package itt =  internal_threadkit_types;            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mps =  microthread_preemptive_scheduler;    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    Fate(X) =   fat::Fate(X);
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package mailslot: (weak)    api {
                                    Mailop(X);
                                    #
                                    include Mailslot;                           # Mailslot                      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.api

                                    reset_mailslot:  Mailslot(X) -> Void;

                                }
    {
        Mailop(X) =  mop::Mailop(X);

        # Some inline functions to improve performance 
        #
        fun enqueue (rwq::RW_QUEUE { back, ... }, x)
            =
            back :=  x ! *back;

        Mailslot(X)
            =
            MAILSLOT
              { priority:  Ref( Int ),
                in_q:      rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
                out_q:     rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate( (itt::Microthread, Fate(X)) )) )
              };                                                                                                        # The above two should probably be records not tuples. XXX SUCKO FIXME.

        fun mailslot_to_string (MAILSLOT { priority => REF p, in_q, out_q })                                            # Debug support, primarily to textify mailslot state for logging via log::note or such.
            =
            {   i_len = list::length (rwq::to_list  in_q);
                o_len = list::length (rwq::to_list out_q);
                sprintf "prio=%d len(in_q)=%d len(out_q)=%d"  p  i_len  o_len;
            };

        fun reset_mailslot (MAILSLOT { priority, in_q, out_q } )
            =
            {   priority :=  1;
                #
                rwq::clear_queue_to_empty  in_q;
                rwq::clear_queue_to_empty  out_q;
            };

        fun make_mailslot ()
            =
            MAILSLOT
              { priority => REF 1,
                in_q     => rwq::make_rw_queue (),
                out_q    => rwq::make_rw_queue ()
              };

        fun same_mailslot                                                       # (Mailslot(X), Mailslot(X)) -> Bool 
            ( MAILSLOT { in_q=>in1, ... },
              MAILSLOT { in_q=>in2, ... }
            )
            =
            rwq::same_queue (in1, in2);


        fun make__mailop_done__refcell ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()) );


        fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    thread_id;
                };

            end_do1mailoprun_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;

        # Given a do1mailoprun_status refcell
        # set the current thread
        # to its thread state and
        # mark it complete:
        #
        fun set_current_thread  do1mailoprun_status
            =
            mps::set_current_thread  (end_do1mailoprun_and_return_thread  do1mailoprun_status);

        Queue_Item(X)
          = NO_ITEM
          |    ITEM  (Ref(itt::Do1mailoprun_Status), Fate(X))
          ;                                                                                                                     # ITEM should probably host a record not a tuple.  XXX SUCKO FIXME.

        # Bump a priority value by one,
        # returning the old value:
        #
        fun bump_priority (p as REF n)
            =
            {   p := n+1;
                n;
            };

        # Functions to clean slot input and output queues 
        #
        stipulate

            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
                    =>
                    clean rest;

                clean l  => l;
            end;

            fun clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  l)
                    =>
                    clean_rev (rest, l);

                clean_rev (x ! rest,  l)
                    =>
                    clean_rev (rest,  x ! l);

                clean_rev ([], l)
                    =>
                    l;
            end;

            fun clean_all l
                =
                reverse (clean_rev (l, []), [])
                where
                    fun reverse (x ! r, l)
                            =>
                            reverse (r, x ! l);

                        reverse ([], l)
                            =>
                            l;
                    end;
                end;

        herein

            fun clean_and_check (priority, rwq::RW_QUEUE { front, back } )
                = 
                clean_front  *front
                where
                    fun clean_front []
                            =>
                            clean_back *back;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] =>   clean_back  *back;
                                #
                                f' =>   {   front :=  f';
                                            #
                                            bump_priority  priority;
                                        };
                            esac;
                    end

                    also
                    fun clean_back [] =>   0;
                        #
                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    [] =>   0;
                                    #
                                    rr =>   {   front :=  rr;
                                                #
                                                bump_priority  priority;
                                            };
                                esac;
                            };
                    end;
                end;

            fun clean_and_remove  (rwq::RW_QUEUE  { front,  back,  ...  } )
                =
                clean_front  *front
                where
                    fun clean_front []
                            =>
                            clean_back  *back;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                []            =>    clean_back  *back;

                                (item ! rest) =>    {   front :=  rest;
                                                        #
                                                        ITEM item;
                                                    };
                           esac;
                    end

                    also
                    fun clean_back [] =>   NO_ITEM;
                        #
                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    []  => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front := rest;
                                            ITEM  item;
                                        };
                                 esac;
                            };
                    end;
                end;

            fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
                =
                case (clean_all *front)
                    #
                    [] =>  {  front := clean_rev(*back, [item]);  back :=  [];                      };
                    f  =>  {  front := f;                         back :=  item ! clean_all *back;  };
                esac;

        end;                            # stipulate


        fun impossible ()
            =
            raise exception  FAIL "Slot: impossible";


        fun put_in_mailslot (mailslot as MAILSLOT { priority, in_q, out_q }, msg)                                                       # Derives from Reppy's send()
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case (clean_and_remove  in_q)
                    #
                    ITEM (do1mailoprun_status, take_fateq)
                        =>
                        call_with_current_fate
                            (fn give_fatep = {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread                  # 'give' thread doesn't need to block, just yield to waiting 'take' thread.
                                                  {
                                                    new_thread =>  end_do1mailoprun_and_return_thread  do1mailoprun_status,
                                                    old_fate   =>  give_fatep
                                                  };
                                                #
                                                priority := 1;

                                                switch_to_fate  take_fateq  msg;                                                # take_fateq_revival_point.  This will canonically jump to take_fateq_resumption_point below.
                                            }
                            );

                    NO_ITEM
                        =>
                        {
                            (call_with_current_fate
                                (fn give_fateq                                                                  # give_fateq will canonically be revived at  give_fateq_revival_point  below.
                                    =
                                    {   enqueue (out_q, (make__mailop_done__refcell(), give_fateq));            # refcell saves give_thread.
                                        #
                                        mps::dispatch_next_thread__usend__noreturn ();
                                    }
                                )
                            ) -> (take_thread, take_fatep);                                                     # give_fateq_resumption_point:   When above give_fateq is eventually invoked we'll wind up here.

                            mps::switch_to_thread__xu
                              (take_thread, take_fatep, msg);                                                   # take_fatep_revival_point
                        };
                esac;
            };


        fun put_in_mailslot' (MAILSLOT { priority, in_q, out_q }, msg)                                                  # Derives from Reppy's sendEvt().
            =
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]
            where
                fun fire_mailop ()                                                                              # Reppy refers to 'fire_mailop' as 'doFn'.
                    =
                    {   (the (rwq::take_from_front_of_queue  in_q))
                            ->
                            (do1mailoprun_status,  take_fate);
                            

                        call_with_current_fate
                            (fn give_fate
                                =
                                {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => end_do1mailoprun_and_return_thread  do1mailoprun_status,    old_fate => give_fate };
                                    #
                                    priority := 1;

                                    switch_to_fate  take_fate  msg;                                     # 
                                }
                            );
                    };

                fun set_up_mailopready_watch                                                    # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                             # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   
                        (call_with_current_fate
                            (fn give_fate
                                =
                                {   clean_and_enqueue (out_q, (do1mailoprun_status, give_fate));
                                    return_to__set_up_mailopready_watches__loop();                      # Return control to mailop.pkg.
                                                                                    impossible ();      # return_to__set_up_mailopready_watches__loop() should never return.
                                }
                            )
                        )
                            -> (take_thread, take_fate);                                                # Execution will pick up on this line when 'give_fate' is eventually called.

                        finish_do1mailoprun();

                        mps::switch_to_thread__xu (take_thread, take_fate, msg);
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case (clean_and_check (priority, in_q))
                        #
                        0        =>  itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                        #
                        priority =>  itt::MAILOP_IS_READY_TO_FIRE    { priority, fire_mailop };
                    esac;
            end;

        fun nonblocking_put_in_mailslot (MAILSLOT { priority, in_q, out_q }, msg)                                       # Derives from Reppy's sendPoll().
            =
            call_with_current_fate
                (
                 fn give_fate                                                                           # Is this 'give_fate' really simply discarded?
                    =
                    {
                        log::uninterruptible_scope_mutex := 1;
                        #
                        case (clean_and_remove  in_q)
                            #
                            ITEM (rid, take_fate)
                                =>
                                {   call_with_current_fate
                                        (
                                         fn give_fate
                                            =
                                            {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread
                                                  {
                                                    new_thread =>  end_do1mailoprun_and_return_thread  rid,
                                                    old_fate   =>  give_fate
                                                  };
                                                #
                                                priority :=  1;

                                                switch_to_fate  take_fate  msg;                         # NB: switch_to_fate never returns.
                                            }
                                        );

                                    TRUE;
                                };

                            NO_ITEM
                                =>
                                {   log::uninterruptible_scope_mutex := 0;
                                    #
                                    FALSE;
                                };
                        esac;
                    }
                );

        fun take_from_mailslot (mailslot as MAILSLOT { priority, in_q, out_q } )                                                        # Derives from Reppy's recv().
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case (clean_and_remove  out_q)
                    #
                    ITEM (do1mailoprun_status, give_fateq)
                        =>
                        {
                            take_thread =  mps::get_current_thread ();
                            #
                            set_current_thread  do1mailoprun_status;                                    # Make 'give' thread the current thread.

                            priority := 1;

                            (call_with_current_fate
                              (
                                fn take_fatep
                                    =   
                                    switch_to_fate  give_fateq  (take_thread, take_fatep)               # give_fateq_revival_point:  this is canonically a jump to give_fateq_resumption_point above.
                                                                                                        # The fate passed here must NOT exit UNINTERRUPTIBLE MODE because take_thread_revival_point
                                                                                                        # invokes it via mps::switch_to_thread__xu.
                              )
                            ) -> result;                                                                # take_fatep_resumption_point:  take_fatep above will resume here once revived. 

                            result;
                        };

                    NO_ITEM
                        =>
                        {
                            (call_with_current_fate
                              (
                                fn take_fateq
                                    =
                                    {   do1mailoprun_status =  make__mailop_done__refcell();
                                        #
                                        enqueue (in_q, (do1mailoprun_status, take_fateq));                      # take_fateq will canonically be revived at  take_fateq_revival_point  above.
                                                                                                        # The fate enqueued here needs to EXIT UNINTERRUPTIBLE MODE
                                        #
                                        mps::dispatch_next_thread__usend__noreturn ();
                                    }
                              )
                            ) -> result;                                                                # take_fateq_resumption_point:  take_fateq above will resume here once revived.

                            log::uninterruptible_scope_mutex := 0;

                            result;
                        };
                esac;
            };

# At the moment a/the problematic sequence is:
#
#
#
#
# [A] give/TOP mode d=0  (about to enter uninterruptible mode)
# [B] give/AAA mode d=1  (after entering uninterruptible mode)
# [C] give/NO_ITEM/MMM mode d=1
#     dispatch_next_thread__usend__noreturn/AAA: mode d=1
#     dispatch_next_thread__usend__noreturn/CCC: mode d=1
#     test_basic_mailslot_functionality_a/AAA (above take):  mps::get_uninterruptible_scope_nesting_depth () d=0
# [D] take/TOP mode d=0
# [E] take/ITEM/AAA mode d=1
# [F] take/ITEM/DDD mode d=1 (doing switch_to_fate give_fate)
# [G] give/NO_ITEM/QQQ mode d=1 (calling switch_to_thread__xu (take_thread, take_fate, msg))
# [H] take/YYY mode d=-1
#
# [1] take/TOP mode d=0
# [2] take/NO_ITEM/AAA mode d=1
# [3] take/NO_ITEM/BBB mode d=1
#     dispatch_next_thread__usend__noreturn/AAA: mode d=1
#     dispatch_next_thread__usend__noreturn/CCC: mode d=1
#     test_basic_mailslot_functionality_b/send LUP(7)post-give:  mps::get_uninterruptible_scope_nesting_depth () d=0
#     test_basic_mailslot_functionality_b/send LUP(8)TOP:        mps::get_uninterruptible_scope_nesting_depth () d=0
# [4] give/TOP mode d=0  (about to enter uninterruptible mode)
# [5] give/AAA mode d=1  (after entering uninterruptible mode)
# [6] give/ITEM mode d=1
# [7] give/ITEM/JJJ mode d=1  (just before switch_to_fate)
#     wake_sleeping_threads_and_schedule_fd_io_and_harvest_dead_subprocesses__usend__fate mode d=1
#     dispatch_next_thread__usend__noreturn/AAA: mode d=2
#     dispatch_next_thread__usend__noreturn/CCC: mode d=2
#
#     take/TOP mode d=0
#     take/NO_ITEM/AAA mode d=1
#     take/NO_ITEM/BBB mode d=1
#     dispatch_next_thread__usend__noreturn/AAA: mode d=1
#     dispatch_next_thread__usend__noreturn/CCC: mode d=1
#     test_basic_mailslot_functionality_b/send LUP(1)post-give:  mps::get_uninterruptible_scope_nesting_depth () d=0
#     test_basic_mailslot_functionality_b/send LUP(2)TOP:        mps::get_uninterruptible_scope_nesting_depth () d=0
#     give/TOP mode d=0  (about to enter uninterruptible mode)
#     give/AAA mode d=1  (after entering uninterruptible mode)
#     give/ITEM mode d=1
#     give/ITEM/JJJ mode d=1  (just before switch_to_fate)
#     take/ZZZ mode d=1
#
#
# [1] take/TOP mode d=0
# [2] take/NO_ITEM/AAA mode d=1
# [3] take/NO_ITEM/BBB mode d=1
#     dispatch_next_thread__usend__noreturn/AAA: mode d=1
#     dispatch_next_thread__usend__noreturn/CCC: mode d=1
#     test_basic_mailslot_functionality_b/send LUP(1)post-give:  mps::get_uninterruptible_scope_nesting_depth () d=0
#     test_basic_mailslot_functionality_b/send LUP(2)TOP:        mps::get_uninterruptible_scope_nesting_depth () d=0
# [4] give/TOP mode d=0  (about to enter uninterruptible mode)
# [5] give/AAA mode d=1  (after entering uninterruptible mode)
# [6] give/ITEM mode d=1
# [7] give/ITEM/JJJ mode d=1  (just before switch_to_fate)
# [8] take/ZZZ mode d=1

        fun take_from_mailslot' (MAILSLOT { priority, in_q, out_q } )                                                   # Derives from Reppy's recvEvt().
            =
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]
            where
                fun fire_mailop ()                                                                      # Reppy refers to 'fire_mailop' as 'doFn'.
                    =
                    {   (rwq::take_from_front_of_queue_or_raise_exception  out_q)
                            ->
                            (do1mailoprun_status, give_fate);

                        my_id =  mps::get_current_thread ();

                        set_current_thread  do1mailoprun_status;

                        priority := 1;

                        call_with_current_fate
                            #
                            (fn take_fate
                                =
                                switch_to_fate  give_fate  (my_id,  take_fate)                          # 
                            );
                    };

                fun set_up_mailopready_watch                                                            # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                                     # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   (call_with_current_fate
                            (
                             fn take_fate
                                =
                                {   clean_and_enqueue  (in_q,  (do1mailoprun_status, take_fate));
                                    #
                                    return_to__set_up_mailopready_watches__loop ();                     # Return control to mailop.pkg.
                                                                                        impossible();   # return_to__set_up_mailopready_watches__loop () should never return.
                                }
                            )
                        )
                            -> msg;                                                                     # Execution will pick up on this line when 'fate' is eventually called.

                        finish_do1mailoprun ();

                        log::uninterruptible_scope_mutex := 0;

                        msg;
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case (clean_and_check (priority, out_q))
                        #
                        0        =>  itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                        #
                        priority =>  itt::MAILOP_IS_READY_TO_FIRE    { priority, fire_mailop };
                    esac;
            end;

        fun nonblocking_take_from_mailslot (MAILSLOT { priority, in_q, out_q } )                                        # Derives from Reppy's recvPoll(0.
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case (clean_and_remove out_q)
                    #
                    ITEM (do1mailoprun_status, give_fate)
                        =>
                        THE (call_with_current_fate
                                (fn take_fate
                                    =
                                    {   my_id =  mps::get_current_thread ();
                                        #
                                        set_current_thread  do1mailoprun_status;

                                        priority := 1;

                                        switch_to_fate  give_fate  (my_id,  take_fate);         # 
                                    }
                            )   );

                    NO_ITEM
                        =>
                        {   log::uninterruptible_scope_mutex := 0;
                            #
                            NULL;
                        };
                esac;
            };
    };
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext