## timeout-mailop.pkg
#
# Mailops that wait until a given time.
# Compiled by:
#
src/lib/std/standard.libstipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mop = mailop; # mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg package tim = time; # time is from
src/lib/std/time.pkg #
Mailop(X) = mop::Mailop(X);
herein
package timeout_mailop: (weak) api {
#
include api Timeout_Mailop; # Timeout_Mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.api reset_sleep_queue_to_empty
:
Void -> Void;
wake_sleeping_threads_whose_time_has_come__iu
:
Void -> Void;
time_until_next_sleeping_thread_wakes
:
Void -> Null_Or( tim::Time );
}
{
# The list of threads waiting for timeouts.
# It is sorted in increasing order
# of time value.
#
# NOTE: We may want to use some sort of
# balanced search package in the future. XXX BUGGO FIXME
#
Item = ( tim::Time,
Void -> Void,
Ref (itt::Do1mailoprun_Status),
fat::Fate( Void )
);
#
sleep_queue
=
REF ([]: List( Item ));
fun time_wait (time, finish_do1mailoprun, do1mailoprun_status, fate)
=
sleep_queue := insert *sleep_queue
where
fun insert []
=>
[ (time, finish_do1mailoprun, do1mailoprun_status, fate) ];
insert ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
# Drop completed transaction in passing:
#
insert rest;
insert (list as ((item as (time', _, _, _)) ! rest))
=>
tim::(<) (time', time) ?? item ! insert rest
:: (time, finish_do1mailoprun, do1mailoprun_status, fate) ! list;
end;
end;
# Drop all completed transactions from itemlist.
# Return cleaned list:
#
fun drop_cancelled_queue_entries items
=
drop_them items
where
fun drop_them ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
drop_them rest;
drop_them (item ! rest)
=>
item ! drop_them rest;
drop_them [] => [];
end;
end;
# Find all sleeping threads whose
# time has come and move them to
# run queue.
#
# Return list of still-sleeping threads.
#
fun wake_and_remove_sleeping_threads_whose_time_has_come q
=
wake_them q
where
now = mps::get_approximate_time ();
fun wake_them ((_, _, REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
wake_them rest;
wake_them (list as ((item as (t', f, do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), fate)) ! rest))
=>
if (tim::(<=) (t', now))
#
mps::push_into_run_queue (thread, fate);
f (); # Cleanup function.
wake_them rest;
else
drop_cancelled_queue_entries list;
fi;
wake_them [] => [];
end;
end;
fun time_until_next_sleeping_thread_wakes ()
=
case (drop_cancelled_queue_entries *sleep_queue)
#
[] => NULL;
(q as ((t, _, _, _) ! _))
=>
{ now = mps::get_approximate_time ();
#
tim::(<=) (t, now)
##
?? THE (tim::zero_time)
:: THE (tim::(-) (t, now));
};
esac;
fun wake_sleeping_threads_whose_time_has_come__iu ()
=
case *sleep_queue
#
[] => ();
#
queue => sleep_queue
:=
wake_and_remove_sleeping_threads_whose_time_has_come
queue;
esac;
fun reset_sleep_queue_to_empty ()
=
sleep_queue := [];
# NOTE: Unlike other base mail_ops, the
# suspend_then_eventually_fire_mailop()
# fns of time-out mail_ops do not have to exit the
# uninterruptible scope or execute the clean-up
# operation -- this is done when they are removed
# from the wait queue.
#
fun timeout_in' (sleep_duration: Float)
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
sleep_duration = time::from_float_seconds sleep_duration;
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ # now = mps::get_approximate_time (); # Replaced by below 2012-02-01 CrT because 100ms wait was coming back after 99ms, triggering 'make check' alarm.
now = tim::get_current_time_utc (); # 2012-07-21 CrT: Making a kernel call every time is probably too expensive, I think maybe we need to redefine the
# # semantics to include some level of allowed slop, or add that much slop to time-to-sleep or such. XXX SUCKO FIXME.
fat::call_with_current_fate
(\\ fate
=
{
time_wait
( tim::(+) (sleep_duration, now),
finish_do1mailoprun,
do1mailoprun_status,
fate
);
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
}
);
# log::uninterruptible_scope_mutex := 0; # Execution will resume on this line when 'fate' is eventually called.
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
if (sleep_duration == tim::zero_time)
#
itt::READY_MAILOP
{ fire_mailop => {. log::uninterruptible_scope_mutex := 0; } # Reppy refers to 'fire_mailop' as 'doFn'.
};
else
itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
fi;
end;
fun sleep_for (sleep_duration: Float)
=
mop::block_until_mailop_fires (timeout_in' sleep_duration);
fun timeout_at' wakeup_time
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ fat::call_with_current_fate
(
\\ fate
=
{ time_wait (wakeup_time, finish_do1mailoprun, do1mailoprun_status, fate);
#
return_to__suspend_then_eventually_fire_mailops__loop (); # This never returns.
}
);
# log::uninterruptible_scope_mutex := 0; # Execution will resume on this line when 'fate' is eventually called.
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
if (tim::(<=) (wakeup_time, mps::get_approximate_time ()))
#
itt::READY_MAILOP { fire_mailop => {. log::uninterruptible_scope_mutex := 0; } }; # Reppy refers to 'fire_mailop' as 'doFn'.
else
itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
fi;
end;
fun sleep_until wakeup_time
=
mop::block_until_mailop_fires (timeout_at' wakeup_time);
};
end;