PreviousUpNext

15.4.922  src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.pkg

## maildrop.pkg                                                 # Derives from   cml/src/core-cml/sync-var.sml
#
# Maildrops are essentially concurrency-safe replacements for REF cells:
#
#  o  Attempting to read from an empty maildrop will block the reading
#     microthread until some other thread deposits a value in the maildrop. 
#
#  o  'take' ops remove the value from a maildrop, leaving it empty.
#
#  o  'peek' ops read the value from a maildrop while leaving it unchanged.
#
#  o  The value stored in a maildrop may be removed, replaced or swapped.

# Compiled by:
#     src/lib/std/standard.lib



###          "We're fools whether we dance or not,
###           so we might as well dance."
###
###                   -- Japanese proverb



stipulate
    package fat =  fate;                                                                                        # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                                                    # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package rwq =  rw_queue;                                                                                    # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package mps =  microthread_preemptive_scheduler;                                                            # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package   maildrop
    :         Maildrop                                                                                          # Maildrop                              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api
    {
        exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
        #
        Fate(X) =   fat::Fate(X);
        #
        Maildrop(X) =   MAILDROP  { read_q:    rwq::Rw_Queue( (Ref(itt::Do1mailoprun_Status), Fate(X)) ),
                                    value:     Ref( Null_Or(X) )
                                  };


        fun maildrop_to_string (MAILDROP { read_q, value }, name)                                               # Debug support, primarily to textify mailslot state for logging via log::note or such.
            =
            {   sprintf "{MD:%s %s [%s] }"
                        #
                        name
                        #
                        case *value NULL => "EMPTY";
                                    _    => "FULL";
                        esac
                        #
                        (sprint_readqueue  read_q);
            }
            where
                fun sprint_thread  thread
                    =
                    {   thread -> itt::MICROTHREAD { thread_id, task, ... };
                        task   -> itt::APPTASK   { task_name, task_id, ... };
                        #
                        sprintf "%d:%d"  thread_id  task_id;
                    };
                #
                fun sprint_readqueue q
                    = 
                    {
                        (string::join  " "  (map  sprint_q_entry (rwq::frontq q)))  +  "|"  +
                        (string::join  " "  (map  sprint_q_entry (rwq::backq  q)));
                    }
                    where
                        fun sprint_q_entry  (REF (itt::DO1MAILOPRUN_IS_COMPLETE), _)
                                =>
                                "*";

                            sprint_q_entry  (REF (itt::DO1MAILOPRUN_IS_BLOCKED  microthread), _)
                                =>
                                sprint_thread  microthread;
                        end;
                    end;
            end;

        fun same_cell ( MAILDROP { value => v1, ... },
                        MAILDROP { value => v2, ... }
                      )
            =
            v1 == v2;


        fun make_do1mailoprun_status ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));


        fun mark_do1mailoprun_complete_and_return_thread  (do1mailoprun_status  as  REF  (itt::DO1MAILOPRUN_IS_BLOCKED  microthread_id))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    microthread_id;
                };

            mark_do1mailoprun_complete_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception DIE "Compiler bug:  Attempt to cancel already-cancelled transaction-id";        # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;


        Qy_Item(X)
          #
          = NO_ITEM
          | ITEM  ((Ref itt::Do1mailoprun_Status,  Fate(X)))
          ;

        # Functions to clean channel input and output queues 
        #
        stipulate

            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)                                           # Drop any cancelled transactions at start of list.
                    =>
                    clean rest;

                clean l  =>  l;                                                                                 # Stop at first non-COMPLETE entry in list -- otherwise
            end;                                                                                                # clean_queue_and_remove_one_entry will be O(N**2) instead of O(N).

            fun clean_rev ([], result)                                                                          # Drop all cancelled transactions from list; result is in reverse order.
                    =>
                    result;

                clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  result)                              # Drop cancelled transaction.
                    =>
                    clean_rev (rest, result);

                clean_rev (x ! rest,  result)
                    =>
                    clean_rev (rest, x ! result);
            end;

        herein

            fun clean_queue_and_remove_one_entry (rwq::RW_QUEUE { front, back, ... } )                          # Drop cancelled transactions and return first non-cancelled one.
                =
                clean_front *front
                where
                    fun clean_front [] =>   clean_back  *back;
                        #
                        clean_front f
                            =>
                            case (clean f)
                                #
                                []            =>    clean_back  *back;

                                (item ! rest) =>    {   front :=  rest;
                                                        #
                                                        ITEM  item;
                                                    };
                            esac;
                    end

                    also
                    fun clean_back [] =>   NO_ITEM;
                        #
                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    [] => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front :=  rest;
                                            #
                                            ITEM item;
                                        };
                                esac;
                            };
                    end;
                end;

        end;                                    # stipulate


        # When a microthread is resumed after being blocked
        # on a maildrop 'get' or 'take' op there may
        # be other threads also blocked on the maildrop.
        #
        # This function is used to propagate the message
        # to the threads that are blocked on the maildrop.
        #
        # It must be called from an uninterruptible scope.
        # When the read_q is finally empty we exit the
        # uninterruptible scope.
        #
        # We must use "clean_queue_and_remove_one_entry"
        # to get items from the read_q to cover the unlikely
        # case that a single microthread executes a choice of
        # multiple gets on the same maildrop.
        #                                                                                                       # Called from   get_from_maildrop   and   get_from_maildrop'.
        fun wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v)                                # 'v' is the value being stored into the maildrop.
            =
            case (clean_queue_and_remove_one_entry  read_q)
                #
                NO_ITEM =>  {
                                log::uninterruptible_scope_mutex := 0;
                            };

                ITEM (do1mailoprun_status, get_v)
                    =>
                    call_with_current_fate
                        (\\ old_fate
                            =
                            {   new_thread = mark_do1mailoprun_complete_and_return_thread  do1mailoprun_status;
                                #
                                mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread, old_fate };
                                #
                                switch_to_fate  get_v  v;                                                       # In essence do get_v(v) and continue interrupted 'get' or 'take' op.
                                                                                                                # The fn we return to will call us right back to continue processing read_q
                            }                                                                                   # unless it is a 'take', which leaves no value to be read.
                        );
            esac;

        fun impossible ()
            =
            raise exception  DIE "maildrop: impossible";


        fun make_empty_maildrop ()                                                                              # Id calls these "m-variables".  (Thought you'd want to know.)
            =
            MAILDROP  { value    =>  REF NULL,
                        read_q   =>  rwq::make_rw_queue ()
                      };

        fun make_full_maildrop  initial_value
            =
            MAILDROP { read_q   =>  rwq::make_rw_queue (),
                       value    =>  REF (THE initial_value)
                     };

        same_maildrop =  same_cell;


        fun put_in_maildrop (maildrop as MAILDROP { read_q, value }, v)                                         # 'v' is value being stored into the maildrop.
            =
            {
                                                                                                                microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "put_in_maildrop";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>
                        {
                            value := THE v;
                            #
                            case (clean_queue_and_remove_one_entry  read_q)
                                #
                                NO_ITEM =>  log::uninterruptible_scope_mutex := 0;

                                ITEM (do1mailoprun_status, get_v)
                                    =>
                                    call_with_current_fate
                                        #
                                        (\\ old_fate
                                            =
                                            {   new_thread =  mark_do1mailoprun_complete_and_return_thread
                                                                do1mailoprun_status;
                                                #
                                                mps::enqueue_old_thread_plus_old_fate_then_install_new_thread
                                                  { new_thread, old_fate };

                                                switch_to_fate  get_v  v;                                       # Do the get_v(v) call that some get_* or take_* is waiting for.
                                            }
                                        );
                            esac;
                        };

                    THE _ =>
                        {   log::uninterruptible_scope_mutex := 0;
                            #
                            raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
                        };
                esac;
            };


        fun take_from_maildrop' (MAILDROP { read_q, value } )
            =
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]
            where
                fun suspend_then_eventually_fire_mailop                                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                    # This typically does  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and then sends nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                                   # After starting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {
                        (call_with_current_fate
                            (\\ fate
                                =
                                {   rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # Return control to mailop.pkg
                                                                                                impossible ();  # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            ->  v;                                                                              # Execution will pick up here when fill() (above) eventually does:   switch_to_fate  get_v  v;

                        finish_do1mailoprun ();                                                                 # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.

                        value := NULL;                                                                          # Empty the maildrop.

                        log::uninterruptible_scope_mutex := 0;                                                  # End uninterruptible scope.

                        v;                                                                                      # Return value read from maildrop.
                    };

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'
                    =
                    case *value
                        #
                        NULL  =>    itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                        #
                        THE v =>    itt::READY_MAILOP
                                      {
                                        fire_mailop => {.   value := NULL;                                      # Reppy refers to 'fire_mailop' as 'doFn'.
                                                            log::uninterruptible_scope_mutex := 0;
                                                            v;
                                                        }
                                      };
                    esac;
            end;


        fun nonblocking_take_from_maildrop (MAILDROP { read_q, value } )
            =
            {
                                                                                                                microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "nonblocking_take_from_maildrop";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {   value := NULL;
                                    log::uninterruptible_scope_mutex := 0;
                                    THE v;
                                };

                    NULL =>     NULL;
                esac;
            };


        fun take_from_maildrop (maildrop as MAILDROP { read_q, value } )                                        # Destructive read.
            =
            {
                                                                                                                microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "take_from_maildrop";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {
                                    value := NULL;
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };

                    NULL =>     {
                                    (call_with_current_fate
                                        (\\ get_v = {   rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
                                                        #
                                                        mps::dispatch_next_thread__xu__noreturn ();
                                                    }
                                        )) -> v;                                                                # When get_v(v) finally gets called, the 'v' winds up here.

                                    value := NULL;
                                                                                                                # Note that we do not call wake_remaining_microthreads_waiting_to_read_maildrop__xu()
                                                                                                                # here because there is no value left in maildrop for them to read.
                                    log::uninterruptible_scope_mutex := 0;

                                    v;
                                };
                esac;
            };


        fun get_from_maildrop (maildrop as MAILDROP { read_q, value } )                                         # Pure read.
            =
            {
                                                                                                                microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "get_from_maildrop";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };

                    NULL =>     {
                                    (call_with_current_fate
                                        (\\ get_v = {   rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
                                                        #
                                                        mps::dispatch_next_thread__xu__noreturn ();
                                                    }
                                        )) -> v;                                                                # When get_v(v) finally gets called, the 'v' winds up here.     

                                    wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v);       # 

                                    v;
                                };
                esac;
            };


        fun get_from_maildrop' (maildrop as MAILDROP { read_q, value } )
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                fun suspend_then_eventually_fire_mailop                                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                                   # After starting a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {
                        (call_with_current_fate
                            (\\ get_v
                                =
                                {   rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, get_v));
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # Return control to mailop.pkg.

                                                                impossible ();                                  # return_to__suspend_then_eventually_fire_mailops__loop should never return.
                                }
                            )
                        )
                            -> v;                                                                               # Execution will resume on this line when 'get_v(v)' is eventually done.

                        finish_do1mailoprun();                                                                  # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.

                        wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v);                   # 

                        v;
                    };

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    case *value
                        #               
                        NULL  =>    itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                        #
                        THE v =>    {
                                        itt::READY_MAILOP
                                          {
                                            fire_mailop => {.   log::uninterruptible_scope_mutex := 0;          # Reppy refers to 'fire_mailop' as 'doFn'.
                                                                v;
                                                            }
                                          };
                                    };
                    esac;
            end;



        fun nonblocking_get_from_maildrop (maildrop as MAILDROP { value, ... } )
            =
            *value;


        # Swap the current contents of the cell with a new value.
        #
        # This function has the effect of an
        # get_mail followed by a put_mail,
        # except that it is guaranteed to be atomic.
        #
        # It is also somewhat more efficient.
        #
        fun maildrop_swap (MAILDROP { read_q, value }, new_v)
            =
            {
                                                                                                                microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "maildrop_swap";
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>     {   v = call_with_current_fate
                                            (\\ get_v
                                                =
                                                {   rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
                                                    #
                                                    mps::dispatch_next_thread__xu__noreturn ();
                                                }
                                            );

                                    value :=  THE new_v;

                                    wake_remaining_microthreads_waiting_to_read_maildrop__xu  (read_q,  new_v); # 

                                    v;
                                };

                    THE v =>    {   value := THE new_v;
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };
                esac;
            };


        fun maildrop_swap' (MAILDROP { read_q, value }, new_maildrop_value)
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                fun suspend_then_eventually_fire_mailop                                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                                   # After starting a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   (call_with_current_fate
                            (\\ fate
                                =
                                {   rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # Return control to mailop.pkg.
                                                                                                impossible();   # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            -> v;                                                                               # Execution picks up on this line when 'get_v(v)' eventually gets called.

                        finish_do1mailoprun ();                                                                 # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.

                        value := THE new_maildrop_value;

                        wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, new_maildrop_value);  # 

                        v;
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case *value
                        #
                        NULL  =>    itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;

                        THE v =>    itt::READY_MAILOP
                                      {
                                        fire_mailop => {.   value := THE new_maildrop_value;                    # Reppy refers to 'fire_mailop' as 'doFn'.
                                                            log::uninterruptible_scope_mutex := 0;
                                                            v;
                                                        }
                                      };
                    esac;
            end;

            # Convenience fn for starting up imp graphs:
            #
            fun make_run_gun ()
                =
                {   run_gun  = make_empty_maildrop ():  Maildrop(Void);
                    run_gun' = get_from_maildrop' run_gun;                                              # Build a mailop that will fire when fire_run_gun() is called, to use as a startup barrier.

                    fun fire_run_gun () =   put_in_maildrop (run_gun, ());

                    { run_gun', fire_run_gun };
                };      

            # Same as above for shutdown:
            #
            fun make_end_gun ()
                =
                {   end_gun  = make_empty_maildrop ():  Maildrop(Void);
                    end_gun' = get_from_maildrop' end_gun;                                              # Build a mailop that will fire when fire_run_gun() is called, to use as a startup barrier.

                    fun fire_end_gun () =   put_in_maildrop (end_gun, ());

                    { end_gun', fire_end_gun };
                };      


    };                                          # package maildrop 
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.




Comments and suggestions to: bugs@mythryl.org

PreviousUpNext