PreviousUpNext

15.4.924  src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.pkg

## mailqueue.pkg                                        # Derives from cml/src/core-cml/mailbox.sml
#
# Unbounded queues of thread-to-thread mail messages.

# Compiled by:
#     src/lib/std/standard.lib




stipulate
    package fat =  fate;                                                                                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                                                                            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mic =  microthread;                                                                                         # microthread                           is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg
    package mps =  microthread_preemptive_scheduler;                                                                    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    #
    Fate(X) =   fat::Fate(X);
    #
    call_with_current_fate =  fat::call_with_current_fate;
    switch_to_fate         =  fat::switch_to_fate;
herein

    package mailqueue: (weak)
    api {
        include api Mailqueue;                                                                                          # Mailqueue                             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api
        #
        reset_mailqueue:  Mailqueue(X) -> Void;
    }
    {
        Queue(X) =  { front:  List(X),
                      back:   List(X)
                    };

        Mailqueue_State(X)
          = EMPTY      Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)))
          | NONEMPTY   Queue(X)                                                                                         # NB: The queue of the NONEMPTY constructor should never be empty -- use EMPTY instead.
          ;

        Mailqueue(X) =  MAILQUEUE { id:         Int,                                                                    # Because sooner or later in debugging we'll want a mapping from mailqueues to other data.
                                    reader:     mic::Microthread,                                                       # The microthread reading from the mailqueue. Useful for debugging and display purposes.
                                    state:      Ref( Mailqueue_State(X) ),
                                    put_count:  Ref( Int ),                                                             # Total number of times 'put' has been called on this mailqueue.
                                    taps:       Ref( List( (Ref(Void), X -> Void) ) )                                   # Debug taps to be called by put_in_mailqueue(). The Ref(Void) values merely identify list entries for deletion.
                                  };

        empty_queue =  EMPTY  { front => [],
                                back  => []
                              };


        fun enqueue ( { front, back }, x)
            =
            { front,
              back => x ! back
            };

        fun dequeue ( { front => x ! rest, back } ) =>  ( { front=>rest, back }, x);
            dequeue ( { front => [],       back } ) =>  dequeue { front=> reverse back, back=> [] };
        end;
            # Remove item from queue and return it plus new queue.
            # This will raise an exception if queue is empty, but
            # caller guarantees that queue is nonempty.

        fun dequeue_all ( { front => x,  back => [] } ) =>  x;
            dequeue_all ( { front => [], back => y  } ) =>  reverse y;
            dequeue_all ( { front => x,  back => y  } ) =>  (x @ (reverse y));
        end;
            # Remove all items from queue and return as a list.
            # This will raise an exception if queue is empty, but
            # caller guarantees that queue is nonempty.

        fun reset_mailqueue (MAILQUEUE { state, ... })
            =
            state :=  empty_queue;

        stipulate
            #
            next_id =  REF 1;
        herein
            fun issue_unique_id ()
                =
                {   result  = *next_id;
                    next_id =  result + 1;                                                                              # Note that we cannot be pre-empted because body of fn contains no function calls.
                    result;
                };      
        end;


        fun make_mailqueue  reader                                                                                      # 'reader' is the microthread which will be reading the mailqueue -- useful to know for debugging and display.
            =
            MAILQUEUE { reader,                                                                                         # 
                        id          =>  issue_unique_id (),
                        state       =>  REF empty_queue,
                        put_count   =>  REF 0,                                                                          # Total number of times 'put' has been called on this mailqueue.
                        taps        =>  REF []  
                      };

        fun same_mailqueue ( MAILQUEUE { id => id1, ... },
                             MAILQUEUE { id => id2, ... }
                           )
            =
            id1 == id2;                                                                                                 # 


        fun make__mailop_done__refcell ()
            =
            REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));


        fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_state))
                =>
                {   do1mailoprun_status :=   itt::DO1MAILOPRUN_IS_COMPLETE;
                    #
                    thread_state;
                };

            end_do1mailoprun_and_return_thread  (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
                =>
                raise exception DIE "Compiler bug:  Attempt to cancel already-cancelled transaction";                   # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;
            

        Mailqueue_Item(X)
          #
          = NO_ITEM
          #
          |    ITEM   ( Ref( itt::Do1mailoprun_Status ),                                                                # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        Fate(X),
                        Mailqueue_State(X)
                      )                                                                                                 # This really should be a record.  XXX SUCKO FIXME.
          ;

        fun get_mailqueue_id (MAILQUEUE { id, ... })
            =
            id;

        fun get_mailqueue_reader (MAILQUEUE { reader, ... })
            =
            reader;

        fun get_mailqueue_putcount (MAILQUEUE { put_count, ... })
            =
            *put_count;

        fun get_mailqueue_length (mailqueue: Mailqueue(X))
            =
            {   mailqueue ->  MAILQUEUE { state => REF state, ... };
                #
                case state
                    #
                    EMPTY     _              =>  0;
                    NONEMPTY { front, back } =>  {  (length front) + (length back); };
                esac;
            };

        fun mailqueue_to_string (MAILQUEUE { state => (REF state), ... }, name)                                         # Debug support, primarily to textify mailqueue state for logging via log::note or such.
            =
            sprintf "{MQ:%s %s }"   name   (sprint_state  state)
            where
                fun sprint_thread  thread
                    =
                    {   thread -> itt::MICROTHREAD { thread_id, task, ... };
                        task   -> itt::APPTASK   { task_name, task_id, ... };
                        #
                        sprintf "%d:%d"  thread_id  task_id;
                    };
                #
                fun sprint_readqueue  q
                    = 
                    string::join  " "  (map  sprint_q_entry  q)
                    where
                        fun sprint_q_entry  (REF (itt::DO1MAILOPRUN_IS_COMPLETE), _)
                                =>
                                "*";

                            sprint_q_entry  (REF (itt::DO1MAILOPRUN_IS_BLOCKED  microthread), _)
                                =>
                                sprint_thread  microthread;
                        end;
                    end;

                fun sprint_state  state
                    =
                    case state
                        #
                        EMPTY    { front, back } =>  {                                           sprintf "EMPTY [%s|%s]"       (sprint_readqueue front) (sprint_readqueue back); };
                        NONEMPTY { front, back } =>  {  f = (length front);  r = (length back);  sprintf "NONEMPTY [%d|%d]=%d)"   f r (f+r); };
                    esac;
            end;



        stipulate

            fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) =>   clean rest;
                clean l => l;
            end;

            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest,  l)
                    =>
                    clean_rev (rest,  l);

                clean_rev (x ! rest,  l)
                    =>
                    clean_rev (rest,  x ! l);
            end;

        herein

            fun clean_and_remove (q as { front, back } )
                =
                clean_front front
                where

                    fun clean_front []
                            =>
                            clean_back back;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] => clean_back back;

                                ((id, k) ! rest)
                                    =>
                                    ITEM (id, k, EMPTY { front=>rest, back } );
                            esac;
                    end

                    also
                    fun clean_back []
                            =>
                            NO_ITEM;

                        clean_back r
                            =>
                            case (clean_rev (r, []))
                                #
                                []             =>  NO_ITEM;
                                (id, k) ! rest =>  ITEM (id, k, EMPTY { front=>rest, back => [] } );
                            esac;
                    end;
                end;
        end;

        fun put_in_mailqueue (MAILQUEUE { state => qstate, put_count, taps, ... }, x)
            =
            {
                                                                                                        mps::assert_not_in_uninterruptible_scope "put_in_mailqueue";
                log::uninterruptible_scope_mutex := 1;
                #
                put_count :=  *put_count + 1;
                #
                apply'  *taps  (\\ (id, tap) = tap(x));
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        case (clean_and_remove q)
                            #
                            NO_ITEM =>
                                {
                                    qstate :=  NONEMPTY { front => [x], back => [] };
                                    #
                                    log::uninterruptible_scope_mutex := 0;
                                };

                            ITEM (do1mailoprun_status, get_fate, qstate')
                                =>
                                call_with_current_fate
                                    (\\ old_fate
                                        =
                                        {   qstate :=  qstate';
                                            #
                                            mps::enqueue_old_thread_plus_old_fate_then_install_new_thread   { new_thread => end_do1mailoprun_and_return_thread  do1mailoprun_status,   old_fate };

                                            switch_to_fate  get_fate  x;                                # 
                                        }
                                    );
                        esac;

                    NONEMPTY q
                        => 
                        call_with_current_fate                                                          # We force a context switch here to reduce the risk of a producer outrunning its consumer.
                            #                                                                           # XXX SUCKO FIXME I think it would be better to context-switch only when we have >=16 things in queue.
                            (\\ fate                                                                    #                 This would allow batching of xdraw commands for efficiency and improve cache locality of codetraces.
                                =
                                {   qstate :=  NONEMPTY (enqueue (q, x));
                                    #
                                    mps::yield_to_next_thread__xu  fate;
                                }
                            );
                 esac;
            };

        fun get_msg__xu (qstate, q)
            =
            {
                (dequeue q) ->   (q', msg);
                #
                case q'
                    #
                    { front => [],
                      back  => []
                    }
                        =>   qstate :=  empty_queue;
                    _   =>   qstate :=  NONEMPTY q';

                esac;

                log::uninterruptible_scope_mutex := 0;

                msg;
            };

        fun get_msgs__xu (qstate, q)
            =
            {
                msgs = dequeue_all q;
                #
                qstate :=  empty_queue;

                log::uninterruptible_scope_mutex := 0;

                msgs;
            };

        fun take_from_mailqueue (mq as MAILQUEUE { state => qstate, ... })
            =
            {
                                                                                                                        mps::assert_not_in_uninterruptible_scope "take_from_mailqueue";
                log::uninterruptible_scope_mutex := 1;
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        {   msg =   call_with_current_fate
                                        (
                                         \\ get_fate
                                            =
                                            {   qstate :=  EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
                                                #
                                                mps::dispatch_next_thread__xu__noreturn ();
                                            }
                                        );

                            log::uninterruptible_scope_mutex := 0;

                            msg;
                         };

                  NONEMPTY q
                      =>
                      get_msg__xu (qstate, q);

                esac;
            };

        fun take_all_from_mailqueue (mq as MAILQUEUE { state => qstate, ... })
            =
            {
                                                                                                                        mps::assert_not_in_uninterruptible_scope "take_all_from_mailqueue";
                log::uninterruptible_scope_mutex := 1;
                #
                case *qstate
                    #
                    EMPTY q
                        =>
                        {   msg =   call_with_current_fate
                                        (
                                         \\ get_fate
                                            =
                                            {   qstate :=  EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
                                                #
                                                mps::dispatch_next_thread__xu__noreturn ();
                                            }
                                        );

                            log::uninterruptible_scope_mutex := 0;

                            [ msg ];
                         };

                  NONEMPTY q
                      =>
                      get_msgs__xu (qstate, q);

                esac;
            };

        fun take_from_mailqueue' (mq as MAILQUEUE { state => qstate, ... })
            =
            {
                fun suspend_then_eventually_fire_mailop
                      {
                        do1mailoprun_status,                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop           # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   q = case *qstate    EMPTY    q =>  q;
                            /* */           NONEMPTY _ =>  raise exception DIE "Unsupported NONEMPTY case in suspend_then_eventually_fire_mailop";      # Should be impossible, since is_mailop_ready_to_fire() (below)
                            esac;                                                                                                                       # only queues us up if *qstate is EMPTY.

                        (call_with_current_fate
                            #
                            (\\ get_fate
                                =
                                {   qstate :=  EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                                                           # Return control to mailop.pkg.
                                                                                                        raise exception DIE "Mailqueue: impossible";    # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            -> msg;                                                                                                                     # Execution will pick up on this line when 'get_fate" is eventually called.

                        finish_do1mailoprun ();

                        log::uninterruptible_scope_mutex := 0;

                        msg;
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case *qstate
                        #
                        EMPTY _ =>   itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                        #
                        NONEMPTY q
                            =>
                            {   qstate :=  NONEMPTY q;
                                #
                                itt::READY_MAILOP  {  fire_mailop =>  {.   get_msg__xu (qstate, q); }  };
                            };
                    esac;


                itt::BASE_MAILOPS  [ is_mailop_ready_to_fire ];                         # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see  src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
            };

        fun take_all_from_mailqueue' (mq as MAILQUEUE { state => qstate, ... })
            =
            {
                fun suspend_then_eventually_fire_mailop
                      {
                        do1mailoprun_status,                                            # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                            # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop           # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
                      }
                    =
                    {   q = case *qstate    EMPTY    q =>  q;
                            /* */           NONEMPTY _ =>  raise exception DIE "Unsupported NONEMPTY case in /suspend_then_eventually_fire_mailop";     # Should be impossible, since is_mailop_ready_to_fire() (below)
                            esac;                                                                                                                       # only queues us up if *qstate is EMPTY.

                        (call_with_current_fate
                            #
                            (\\ get_fate
                                =
                                {   qstate :=  EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                                                           # Return control to mailop.pkg.
                                                                                                        raise exception DIE "Mailqueue: impossible";    # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
                                }
                            )
                        )
                            -> msg;                                                                                                                     # Execution will pick up on this line when 'get_fate" is eventually called.

                        finish_do1mailoprun ();

                        log::uninterruptible_scope_mutex := 0;

                        [ msg ];
                    };

                fun is_mailop_ready_to_fire ()
                    =
                    case *qstate
                        #
                        EMPTY _ =>   itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                        #
                        NONEMPTY q
                            =>
                            {   qstate :=  NONEMPTY q;
                                #
                                itt::READY_MAILOP  {  fire_mailop  =>  {. get_msgs__xu (qstate, q); } };
                            };
                    esac;


                itt::BASE_MAILOPS  [ is_mailop_ready_to_fire ];         # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see  src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
            };

        fun note_mailqueue_tap (MAILQUEUE { taps, ... }, tap)
            =
            {   id = REF ();
                taps :=  (id, tap) ! *taps;
                id;
            };

        fun drop_mailqueue_tap (MAILQUEUE { taps, ... }, id_to_drop)
            =
            taps :=  list::remove
                        (\\ (id, tap) =  id == id_to_drop)
                        *taps;

    };                          # package mailqueue
end;



## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext