PreviousUpNext

15.4.902  src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.pkg

## maildrop.pkg                                                 # Derives from   cml/src/core-cml/sync-var.sml
#
# Maildrops are essentially concurrency-safe replacements for REF cells.

# Compiled by:
#     src/lib/std/standard.lib



###          "We're fools whether we dance or not,
###           so we might as well dance."
###
###                   -- Japanese proverb



stipulate
    package fat =  fate;                                        # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                    # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package rwq =  rw_queue;                                    # rw_queue                              is from   src/lib/src/rw-queue.pkg
    package mps =  microthread_preemptive_scheduler;            # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package   maildrop
    :         Maildrop                                          # Maildrop                      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api
    {
        exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
        #
        Fate(X) =   fat::Fate(X);
        #
        Maildrop(X) =   MAILDROP
                          {
                            priority:  Ref( Int ),
                            read_q:    rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
                            value:     Ref(  Null_Or(X) )
                          };


        fun maildrop_to_string (MAILDROP { priority => REF p, read_q, value })                                          # Debug support, primarily to textify mailslot state for logging via log::note or such.
            =
            {   q_len = list::length (rwq::to_list read_q);
                sprintf "prio=%d len(in_q)=%d value is %s"  p  q_len  case *value NULL => "EMPTY"; _ => "FULL"; esac;
            };

        fun make_cell ()
            =
            MAILDROP  { priority =>  REF 0,
                        value    =>  REF NULL,
                        read_q   =>  rwq::make_rw_queue ()
                      };

        fun same_cell ( MAILDROP { value => v1, ... },
                        MAILDROP { value => v2, ... }
                      )
            =
            v1 == v2;


        fun make_do1mailoprun_status ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));


        fun mark_do1mailoprun_complete_and_return_thread  (do1mailoprun_status  as  REF  (itt::DO1MAILOPRUN_IS_BLOCKED  thread_id))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    thread_id;
                };

            mark_do1mailoprun_complete_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;


        fun bump_priority (p as REF n)                                                                          # Bump a priority value by one, returning the old value.
            =
            {   p := n+1;
                n;
            };


        Qy_Item(X)
          #
          = NO_ITEM
          | ITEM  ((Ref itt::Do1mailoprun_Status,  Fate(X)))
          ;

        # Functions to clean channel input and output queues 
        #
        stipulate

            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)                           # Drop any cancelled transactions at start of list.
                    =>
                    clean rest;

                clean l  =>  l;
            end;

            fun clean_rev ([], result)                                                          # Drop all cancelled transactions from list; result is in reverse order.
                    =>
                    result;

                clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  result)
                    =>
                    clean_rev (rest, result);

                clean_rev (x ! rest,  result)
                    =>
                    clean_rev (rest, x ! result);
            end;

        herein

            fun clean_and_check (priority, rwq::RW_QUEUE { front, back } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_back *back;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] =>   clean_back  *back;

                                f' =>   {   front :=  f';
                                            #
                                            bump_priority  priority;
                                        };
                            esac;
                    end

                    also
                    fun clean_back []
                            =>
                            0;

                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    [] =>   0;

                                    rr =>   {   front :=  rr;
                                                #
                                                bump_priority  priority;
                                            };
                                esac;
                            };
                    end;
                end;


            fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
                =
                clean_front *front
                where
                    fun clean_front [] =>   clean_back  *back;
                        #
                        clean_front f
                            =>
                            case (clean f)
                                #
                                []            =>    clean_back  *back;

                                (item ! rest) =>    {   front :=  rest;
                                                        #
                                                        ITEM  item;
                                                    };
                            esac;
                    end

                    also
                    fun clean_back [] =>   NO_ITEM;
                        #
                        clean_back r
                            =>
                            {   back :=  [];
                                #
                                case (clean_rev (r, []))
                                    #
                                    [] => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front :=  rest;
                                            #
                                            ITEM item;
                                        };
                                esac;
                            };
                    end;
                end;

            fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
                =
                clean_front  *front
                where
                    fun clean_front [] =>   clean_back  *back;
                        #
                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] =>   clean_back *back;

                                f' =>   {   front := f';
                                            #
                                            back  := item ! *back;
                                        };
                            esac;
                    end

                    also
                    fun clean_back []
                            =>
                            front := [item];

                        clean_back r
                            =>
                            case (clean_rev (r, []))
                                #
                                [] => {  front := [item];  back  := []; };
                                rr => {  back  := [item];  front := rr; };
                            esac;
                    end;
                end;
        end;                                    # stipulate


        # When a thread is resumed after being blocked
        # on a maildrop or oneshot_maildrop op there may
        # be other threads also blocked on the variable.
        #
        # This function is used to propagate the message
        # to all of the threads that are blocked on the
        # variable (or until one of them takes the value
        # in the mvar case).
        #
        # It must be called from an atomic region.
        # When the readQ is finally empty we leave
        # the atomic region.
        #
        # We must use "clean_and_remove" to get items
        # from the readQ in the unlikely event that
        # a single thread executes a choice of
        # multiple gets on the same variable.
        #
        fun relay_msg (read_q, msg)
            =
            case (clean_and_remove  read_q)
                #
                NO_ITEM =>   log::uninterruptible_scope_mutex := 0;

                ITEM (do1mailoprun_status, fate)
                    =>
                    call_with_current_fate
                        (fn old_fate
                            =
                            {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread  { new_thread => mark_do1mailoprun_complete_and_return_thread  do1mailoprun_status,   old_fate };
                                #
                                switch_to_fate  fate  msg;                                                                      # 
                            }
                        );
            esac;

        fun impossible ()
            =
            raise exception  FAIL "maildrop: impossible";


        # M-variables:
        #
        make_empty_maildrop
            =
            make_cell;


        fun make_maildrop  initial_value
            =
            MAILDROP { priority =>  REF 0,
                       read_q   =>  rwq::make_rw_queue (),
                       value    =>  REF (THE initial_value)
                     };

        same_maildrop =  same_cell;


        fun put_in_maildrop (maildrop as MAILDROP { priority, read_q, value }, maildrop_value)
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>
                        {   value := THE maildrop_value;
                            #
                            case (clean_and_remove read_q)
                                #
                                NO_ITEM =>  {
                                                log::uninterruptible_scope_mutex := 0;
                                            };

                                ITEM (do1mailoprun_status, fate)
                                    =>
                                    {
                                        call_with_current_fate
                                            #
                                            (fn old_fate
                                                =
                                                {   mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => mark_do1mailoprun_complete_and_return_thread  do1mailoprun_status,   old_fate };
                                                    #
                                                    priority := 1;

                                                    switch_to_fate  fate  maildrop_value;                                                                       # Resume empty' below.
                                                }
                                            );
                                    };
                            esac;
                        };

                    THE _ =>
                        {
                            log::uninterruptible_scope_mutex := 0;
                            #
                            raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
                        };
                esac;
            };


        fun take_from_maildrop' (MAILDROP { priority, read_q, value } )
            =
            itt::BASE_MAILOPS [is_mailop_ready_to_fire]
            where
                fun set_up_mailopready_watch                                            # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                            # This typically does  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and then sends nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                     # After starting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {
                        (call_with_current_fate
                            (fn fate
                                =
                                {   rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
                                    #
                                    return_to__set_up_mailopready_watches__loop ();                             # Return control to mailop.pkg
                                                                                                impossible ();  # return_to__set_up_mailopready_watches__loop() should never return.
                                }
                            )
                        )
                            ->  maildrop_value;                                                                 # Execution will pick up here when fill() (above) eventually does:   switch_to_fate  fate  maildrop_value;

                        finish_do1mailoprun ();                                                                 # Do stuff like   do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        value := NULL;                                                                          # Empty the maildrop.

                        log::uninterruptible_scope_mutex := 0;                                                  # End uninterruptible scope.

                        maildrop_value;                                                                         # Return value read from maildrop.
                    };

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'
                    =
                    case *value
                        #
                        NULL  =>    itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                        #
                        THE v =>    itt::MAILOP_IS_READY_TO_FIRE
                                      {
                                        priority    =>   bump_priority  priority,

                                        fire_mailop => .{   value := NULL;                              # Reppy refers to 'fire_mailop' as 'doFn'.
                                                            log::uninterruptible_scope_mutex := 0;
                                                            v;
                                                        }
                                      };
                    esac;
            end;


        fun nonblocking_take_from_maildrop (MAILDROP { priority, read_q, value } )
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {   value := NULL;
                                    log::uninterruptible_scope_mutex := 0;
                                    THE v;
                                };

                    NULL =>     NULL;
                esac;
            };


        fun take_from_maildrop (maildrop as MAILDROP { priority, read_q, value } )
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>
                        {
                            v = call_with_current_fate
                                    (fn fate
                                        =
                                        {
                                            rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
                                            #
                                            mps::dispatch_next_thread__usend__noreturn ();
                                        }
                                    );

                            value := NULL;
                            log::uninterruptible_scope_mutex := 0;
                            v;
                        };

                    THE v =>
                        {
                            value := NULL;
                            log::uninterruptible_scope_mutex := 0;
                            v;
                        };
                esac;
            };


        fun peek_in_maildrop (maildrop as MAILDROP { priority, read_q, value } )
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>     {
                                    v =  call_with_current_fate
                                            (fn fate
                                                =
                                                {
                                                    rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
                                                    #
                                                    mps::dispatch_next_thread__usend__noreturn ();
                                                }
                                            );

                                    relay_msg (read_q, v);

                                    v;
                                };

                    THE v =>    {
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };
                esac;
            };


        fun peek_in_maildrop' (maildrop as MAILDROP { priority, read_q, value } )
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                fun set_up_mailopready_watch                                            # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                     # After starting a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {
                        (call_with_current_fate
                            (fn fate
                                =
                                {
                                    rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
                                    return_to__set_up_mailopready_watches__loop ();                             # Return control to mailop.pkg.
                                                                                            impossible ();      # return_to__set_up_mailopready_watches__loop should never return.
                                }
                            )
                        )
                            -> v;                                                                               # Execution will resume on this line when 'fate' is eventually called.

                        finish_do1mailoprun();
                        relay_msg (read_q, v);
                        v;
                    };

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    case *value
                        #               
                        NULL  =>    {
                                        itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                                    };
                        #
                        THE v =>    {
                                        itt::MAILOP_IS_READY_TO_FIRE
                                          {
                                            priority    =>   bump_priority priority,
                                            #
                                            fire_mailop => .{   log::uninterruptible_scope_mutex := 0;          # Reppy refers to 'fire_mailop' as 'doFn'.
                                                                v;
                                                            }
                                          };
                                    };
                    esac;
            end;



        fun nonblocking_peek_in_maildrop (maildrop as MAILDROP { priority, read_q, value } )
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    THE v =>    {
                                    log::uninterruptible_scope_mutex := 0;
                                    #
                                    THE v;
                                };

                    NULL =>     {
                                    log::uninterruptible_scope_mutex := 0;
                                    NULL;
                                };
                esac;
            };


        # Swap the current contents of the cell with a new value.
        #
        # This function has the effect of an
        # get_mail followed by a put_mail,
        # except that it is guaranteed to be atomic.
        #
        # It is also somewhat more efficient.
        #
        fun maildrop_swap (MAILDROP { priority, read_q, value }, new_v)
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *value
                    #
                    NULL =>     {   v = call_with_current_fate
                                            (fn fate
                                                =
                                                {   rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
                                                    #
                                                    mps::dispatch_next_thread__usend__noreturn ();
                                                }
                                            );

                                    value :=  THE new_v;

                                    relay_msg  (read_q,  new_v);                                        # Relay the new value to any other blocked threads.

                                    v;
                                };

                    THE v =>    {   value := THE new_v;
                                    log::uninterruptible_scope_mutex := 0;
                                    v;
                                };
                esac;
            };


        fun maildrop_swap' (MAILDROP { priority, read_q, value }, new_maildrop_value)
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                fun set_up_mailopready_watch                                                            # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                                     # After starting a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   (call_with_current_fate
                            (fn fate
                                =
                                {   rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
                                    return_to__set_up_mailopready_watches__loop ();                             # Return control to mailop.pkg.
                                                                                                impossible();   # return_to__set_up_mailopready_watches__loop() should never return.
                                }
                            )
                        )
                            -> old_maildrop_value;                                                              # Execution picks up on this line when 'fate' eventually gets called.

                        finish_do1mailoprun ();
                        value := THE new_maildrop_value;
                        relay_msg (read_q, new_maildrop_value);
                        old_maildrop_value;
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case *value
                        #
                        NULL  =>    itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;

                        THE v =>    itt::MAILOP_IS_READY_TO_FIRE
                                      {
                                        priority    =>   bump_priority priority,
                                        #
                                        fire_mailop => .{   value := THE new_maildrop_value;                            # Reppy refers to 'fire_mailop' as 'doFn'.
                                                            log::uninterruptible_scope_mutex := 0;
                                                            v;
                                                        }
                                      };
                    esac;
            end;

    };                                          # package maildrop 
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.




Comments and suggestions to: bugs@mythryl.org

PreviousUpNext