PreviousUpNext

15.4.904  src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.pkg

## mailqueue.pkg                                        # Derives from cml/src/core-cml/mailbox.sml
#
# Unbounded queues of thread-to-thread mail messages.

# Compiled by:
#     src/lib/std/standard.lib




stipulate
    package fat =  fate;                                                                                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                                                            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mps =  microthread_preemptive_scheduler;                                                                    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    Fate(X) =   fat::Fate(X);
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package mailqueue: (weak)
    api {
        include Mailqueue;                                                                                              # Mailqueue                     is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api
        #
        reset_mailqueue:  Mailqueue(X) -> Void;
    }
    {
        Queue(X) =  { front:  List(X),
                      rear:   List(X)
                    };

        fun enqueue ( { front, rear }, x)
            =
            { front,
              rear => x ! rear
            };

        fun dequeue ( { front => x ! rest, rear } ) =>  ( { front=>rest, rear }, x);
            dequeue ( { front => [],       rear } ) =>  dequeue { front=>list::reverse rear, rear=> [] };
        end;
            # Remove item from queue and return it plus new queue.
            # This will raise an exception if queue is empty, but
            # caller guarantees that queue is nonempty.

        Mailqueue_State(X)
          = EMPTY      Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)))
          | NONEMPTY  (Int, Queue(X))                                                                                   # The 'Int' is a priority -- starts at 1, incremented by is_mailop_ready_to_fire ().
          ;                                                                                                             # NB: The queue of the NONEMPTY constructor should never be empty -- use EMPTY instead.

        Mailqueue(X) = MAILQUEUE  Ref( Mailqueue_State(X) );

        empty_queue =  EMPTY  { front => [],
                                rear  => []
                              };

        fun reset_mailqueue (MAILQUEUE state)
            =
            state :=  empty_queue;

        fun make_mailqueue ()
            =
            MAILQUEUE (REF empty_queue);

        fun same_mailqueue ( MAILQUEUE s1,
                             MAILQUEUE s2
                           )
            =
            s1 == s2;                                                                                                   # Compare the refcells -- each refcell is equal only to itself, and different mailqueues will always have different refcells.


        fun make__mailop_done__refcell ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));


        fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_state))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    thread_state;
                };

            end_do1mailoprun_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction";                  # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;
            

        Mailqueue_Item(X)
          #
          = NO_ITEM
          #
          |    ITEM   ( Ref( itt::Do1mailoprun_Status ),                                                                # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        Fate(X),
                        Mailqueue_State(X)
                      )                                                                                                 # This really should be a record.  XXX SUCKO FIXME.
          ;

        fun mailqueue_length (mailqueue: Mailqueue(X))
            =
            {   mailqueue ->  MAILQUEUE (REF state);
                #
                case state
                    #
                    EMPTY     _                   =>  0;
                    NONEMPTY (_, { front, rear }) =>  {  (list::length front) + (list::length rear); };
                esac;
            };

        fun mailqueue_to_string (mailqueue: Mailqueue(X))
            =
            {   mailqueue ->  MAILQUEUE (REF state);
                #
                case state
                    #
                    EMPTY        { front, rear }  =>  {  f = (list::length front);  r = (list::length rear);  sprintf "EMPTY [%d|%d]=%d"            f r (f+r); };
                    NONEMPTY (i, { front, rear }) =>  {  f = (list::length front);  r = (list::length rear);  sprintf "NONEMPTY(%d,[%d|%d]=%d)"  i  f r (f+r); };
                esac;
            };


        stipulate

            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) =>   clean rest;
                clean l => l;
            end;

            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  l)
                    =>
                    clean_rev (rest,  l);

                clean_rev (x ! rest,  l)
                    =>
                    clean_rev (rest,  x ! l);
            end;

        herein

            fun clean_and_remove (q as { front, rear } )
                =
                clean_front front
                where

                    fun clean_front []
                            =>
                            clean_rear rear;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] => clean_rear rear;

                                ((id, k) ! rest)
                                    =>
                                    ITEM (id, k, EMPTY { front=>rest, rear } );
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            NO_ITEM;

                        clean_rear r
                            =>
                            case (clean_rev (r, []))
                                #
                                []             =>  NO_ITEM;
                                (id, k) ! rest =>  ITEM (id, k, EMPTY { front=>rest, rear => [] } );
                            esac;
                    end;
                end;
        end;

        fun put_in_mailqueue (MAILQUEUE qstate, x)
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        case (clean_and_remove q)
                            #
                            NO_ITEM =>
                                {
                                    qstate :=  NONEMPTY (1, { front => [x], rear => [] } );
                                    #
                                    log::uninterruptible_scope_mutex := 0;
                                };

                            ITEM (do1mailoprun_status, get_fate, qstate')
                                =>
                                call_with_current_fate
                                    (fn old_fate
                                        =
                                        {   qstate :=  qstate';
                                            #
                                            mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => end_do1mailoprun_and_return_thread  do1mailoprun_status,   old_fate };

                                            switch_to_fate  get_fate  x;                                                                # 
                                        }
                                    );
                        esac;

                    NONEMPTY (p, q)
                        => 
                        call_with_current_fate                                                          # We force a context switch here to prevent a producer from outrunning a consumer.
                            #
                            (fn fate
                                =
                                {   qstate :=  NONEMPTY (p, enqueue (q, x));
                                    #
                                    mps::yield_to_next_thread__xu  fate;
                                }
                            );
                 esac;
            };

        fun get_msg__xu (qstate, q)
            =
            {
                (dequeue q) ->   (q', msg);
                #
                case q'
                    #
                    { front => [],
                      rear  => []
                    }
                        =>   qstate :=  empty_queue;
                    _   =>   qstate :=  NONEMPTY (1, q');

                esac;

                log::uninterruptible_scope_mutex := 0;

                msg;
            };

        fun take_from_mailqueue (mq as MAILQUEUE qstate)
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        {   msg =   call_with_current_fate
                                        (
                                         fn get_fate
                                            =
                                            {   qstate :=  EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
                                                #
                                                mps::dispatch_next_thread__usend__noreturn ();
                                            }
                                        );

                            log::uninterruptible_scope_mutex := 0;

                            msg;
                         };

                  NONEMPTY (priority, q)
                      =>
                      get_msg__xu (qstate, q);

                esac;
            };

        fun take_from_mailqueue' (mq as MAILQUEUE qstate)
            =
            {
                fun set_up_mailopready_watch
                      {
                        do1mailoprun_status,                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__set_up_mailopready_watches__loop                     # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     set_up_mailopready_watch ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   q = case *qstate    EMPTY    q =>  q;
                            /* */           NONEMPTY _ =>  raise exception FAIL "Compiler bug: Unsupported NONEMPTY case in poll'/set_up_mailopready_watch";    # Should be impossible, since is_mailop_ready_to_fire() (below)
                            esac;                                                                                                                               # only queues us up if *qstate is EMPTY.

                        (call_with_current_fate
                            #
                            (fn get_fate
                                =
                                {   qstate :=  EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
                                    #
                                    return_to__set_up_mailopready_watches__loop ();                                                                     # Return control to mailop.pkg.
                                                                                                        raise exception FAIL "Mailqueue: impossible";   # return_to__set_up_mailopready_watches__loop() should never return.
                                }
                            )
                        )
                            -> msg;                                                                                                                     # Execution will pick up on this line when 'get_fate" is eventually called.

                        finish_do1mailoprun ();

                        log::uninterruptible_scope_mutex := 0;

                        msg;
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case *qstate
                        #
                        EMPTY _ =>   itt::MAILOP_IS_NOT_READY_TO_FIRE  set_up_mailopready_watch;
                        #
                        NONEMPTY (priority, q)
                            =>
                            {   qstate :=  NONEMPTY (priority+1, q);
                                #
                                itt::MAILOP_IS_READY_TO_FIRE
                                  {
                                    priority,
                                    fire_mailop    =>  .{   
                                                            get_msg__xu (qstate, q);
                                                        }
                                  };
                            };
                    esac;


                itt::BASE_MAILOPS  [ is_mailop_ready_to_fire ];         # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see  src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
            };

        fun nonblocking_take_from_mailqueue (MAILQUEUE qstate)
            =
            {
                log::uninterruptible_scope_mutex := 1;
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        {   log::uninterruptible_scope_mutex := 0;
                            NULL;
                        };

                    NONEMPTY (priority, q)
                        =>
                        THE (get_msg__xu (qstate, q));
                esac;
            };
    };                          # package mailqueue
end;



## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext