PreviousUpNext

15.4.935  src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.pkg

## timeout-mailop.pkg
#
# Mailops that wait until a given time.

# Compiled by:
#     src/lib/std/standard.lib





stipulate
    package fat =  fate;                                                # fate                                  is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;                            # internal_threadkit_types              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package mop =  mailop;                                              # mailop                                is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
    package mps =  microthread_preemptive_scheduler;                    # microthread_preemptive_scheduler      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg
    package tim =  time;                                                # time                                  is from   src/lib/std/time.pkg
    #
    Mailop(X) =  mop::Mailop(X);
herein

    package timeout_mailop: (weak)  api {
        #
        include api Timeout_Mailop;                                     # Timeout_Mailop                        is from   src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.api

        reset_sleep_queue_to_empty
            :
            Void -> Void;

        wake_sleeping_threads_whose_time_has_come__iu
            :
            Void -> Void;

        time_until_next_sleeping_thread_wakes
            :
            Void -> Null_Or( tim::Time );
    }

    {
        # The list of threads waiting for timeouts.
        # It is sorted in increasing order
        # of time value.
        #
        # NOTE: We may want to use some sort of
        # balanced search package in the future.                XXX BUGGO FIXME
        #
        Item = ( tim::Time,
                 Void -> Void,
                 Ref (itt::Do1mailoprun_Status),
                 fat::Fate( Void )
               );
        #
        sleep_queue
            =
            REF ([]: List( Item ));


        fun time_wait (time, finish_do1mailoprun, do1mailoprun_status, fate)
            =
            sleep_queue := insert *sleep_queue
            where
                fun insert []
                        =>
                        [ (time, finish_do1mailoprun, do1mailoprun_status, fate) ];

                    insert ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
                        =>
                        # Drop completed transaction in passing:
                        #
                        insert rest;

                    insert (list as ((item as (time', _, _, _)) ! rest))
                       =>
                       tim::(<) (time', time)  ??  item ! insert rest
                                               ::  (time, finish_do1mailoprun, do1mailoprun_status, fate) ! list;
                end;
            end;


        # Drop all completed transactions from itemlist.
        # Return cleaned list:
        #
        fun drop_cancelled_queue_entries  items
            =
            drop_them  items
            where
                fun drop_them ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
                        =>
                        drop_them  rest;

                    drop_them (item ! rest)
                        =>
                        item  !  drop_them  rest;

                    drop_them [] => [];
                end;
            end;


        # Find all sleeping threads whose
        # time has come and move them to
        # run queue.
        #
        # Return list of still-sleeping threads.
        #
        fun wake_and_remove_sleeping_threads_whose_time_has_come  q
            =
            wake_them q
            where
                now =  mps::get_approximate_time ();

                fun wake_them ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
                        =>
                        wake_them rest;

                    wake_them (list as ((item as (t', f, do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), fate)) ! rest))
                        =>
                        if (tim::(<=) (t', now))
                            #
                            mps::push_into_run_queue (thread, fate);
                            f ();                                                       # Cleanup function. 
                            wake_them  rest;
                        else
                            drop_cancelled_queue_entries  list;
                        fi;

                   wake_them [] => [];
               end;
            end;


        fun time_until_next_sleeping_thread_wakes ()
            =
            case (drop_cancelled_queue_entries *sleep_queue)
                #
                []  =>  NULL;

                (q as ((t, _, _, _) ! _))
                    =>
                    {   now =  mps::get_approximate_time ();
                        #
                        tim::(<=) (t, now)
                            ##
                            ??   THE (tim::zero_time)
                            ::   THE (tim::(-) (t, now));
                    };
            esac;


        fun wake_sleeping_threads_whose_time_has_come__iu ()
            =
            case *sleep_queue
                #
                []     =>   ();
                #
                queue  =>   sleep_queue
                                :=
                                wake_and_remove_sleeping_threads_whose_time_has_come
                                    queue;
            esac;


        fun reset_sleep_queue_to_empty ()
            =
            sleep_queue := [];


        # NOTE: Unlike other base mail_ops, the
        #           suspend_then_eventually_fire_mailop()
        #       fns of time-out mail_ops do not have to exit the
        #       uninterruptible scope or execute the clean-up
        #       operation -- this is done when they are removed
        #       from the wait queue.
        #
        fun timeout_in'  (sleep_duration: Float)
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                sleep_duration =  time::from_float_seconds  sleep_duration;

                fun suspend_then_eventually_fire_mailop                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                   # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    { # now =    mps::get_approximate_time ();                                  # Replaced by below 2012-02-01 CrT because 100ms wait was coming back after 99ms, triggering 'make check' alarm.
                        now =    tim::get_current_time_utc ();                                  # 2012-07-21 CrT: Making a kernel call every time is probably too expensive, I think maybe we need to redefine the
                        #                                                                       #                 semantics to include some level of allowed slop, or add that much slop to time-to-sleep or such. XXX SUCKO FIXME.
                        fat::call_with_current_fate
                            (\\ fate
                                =
                                {
                                    time_wait
                                        ( tim::(+) (sleep_duration, now),
                                          finish_do1mailoprun,
                                          do1mailoprun_status,
                                          fate
                                        );

                                    return_to__suspend_then_eventually_fire_mailops__loop ();   # Return control to mailop.pkg.
                                }
                            );

#                       log::uninterruptible_scope_mutex := 0;                                  # Execution will resume on this line when 'fate' is eventually called.
                    };

                fun is_mailop_ready_to_fire ()                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    if (sleep_duration == tim::zero_time)
                        #
                        itt::READY_MAILOP
                          { fire_mailop =>  {. log::uninterruptible_scope_mutex := 0; }         # Reppy refers to 'fire_mailop' as 'doFn'.
                          };
                    else
                        itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
                    fi;

            end;


        fun sleep_for  (sleep_duration: Float)
            =
            mop::block_until_mailop_fires  (timeout_in'  sleep_duration);

        fun timeout_at'  wakeup_time
            =
            itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
            where
                fun suspend_then_eventually_fire_mailop                                         # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
                      {
                        do1mailoprun_status,                                                    # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
                        finish_do1mailoprun,                                                    # Do any required end-of-do1mailoprun work such as  do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;  and sending nacks as appropriate.
                        return_to__suspend_then_eventually_fire_mailops__loop                   # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
                      }
                    =
                    # This fn gets used in
                    #
                    #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
                    #
                    # when a
                    #
                    #     do_one_mailop [ ... ]
                    #
                    # call has no mailops ready to fire.  'do_one_mailop' must then block until
                    # at least one mailop is ready to fire.  It does this by calling the
                    #
                    #     suspend_then_eventually_fire_mailop ()
                    #
                    # fn on each mailop in the list; each such call will typically
                    # make an entry in one or more run queues of blocked threads.
                    #
                    # The first mailop to fire cancels the rest by doing
                    #
                    #     do1mailoprun_status :=  DO1MAILOPRUN_IS_COMPLETE;
                    #
                    {   fat::call_with_current_fate
                            (
                             \\ fate
                                =
                                {   time_wait (wakeup_time, finish_do1mailoprun, do1mailoprun_status, fate);
                                    #
                                    return_to__suspend_then_eventually_fire_mailops__loop ();                   # This never returns.
                                }
                            );

#                       log::uninterruptible_scope_mutex := 0;                                                  # Execution will resume on this line when 'fate' is eventually called.
                    };

                fun is_mailop_ready_to_fire ()                                                                  # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
                    =
                    if (tim::(<=) (wakeup_time, mps::get_approximate_time ()))
                        #
                        itt::READY_MAILOP { fire_mailop => {. log::uninterruptible_scope_mutex := 0; } };       # Reppy refers to 'fire_mailop' as 'doFn'.
                    else
                        itt::UNREADY_MAILOP  suspend_then_eventually_fire_mailop;
                    fi;
            end;


        fun sleep_until  wakeup_time
            =
            mop::block_until_mailop_fires  (timeout_at'  wakeup_time);
    };
end;



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext