## mailop.pkg
#
# Implementation of mailop values and the mailop combinators.
#
# Our core types and datastructures are defined in:
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg#
#
#
# Overview
# ========
#
# A 'mailop' represents a computation which might
# or might not be ready to proceed, such as a read
# from a maildrop which might or might not contain
# a value ready to be read.
#
# A mailop is in essence represented by a
#
# is_mailop_ready_to_fire()
#
# fn which tests to see if the computation
# is ready to proceed and if it
#
# IS ready: Returns a fire_mailop() fn which will perform the computation.
# is NOT ready: Returns a suspend_then_eventually_fire_mailop() fn which will arrange
# for us to be woken up when/if it is ready to fire.
#
# This structure encapslates in function-wrapped
# form the functionality which is needed by the
#
# do_one_mailop []
#
# fn in order to select and fire exactly one mailop from a list
# of mailops, if necessary blocking until one is ready to fire.
#
#
#
# Implementation
# ==============
#
# Some important requirements on the
# implementation of base mailop values:
#
# 1) Mailop
# is_mailop_ready_to_fire()
# fire_mailop()
# suspend_then_eventually_fire_mailop()
# fns are always called from inside an uninterruptible scope. # "uninterruptible scope" == "critical section" == "atomic region" == ...
# # In practice it means that microthread_preemptive_scheduler::thread_scheduler_state is either IN_UNINTERRUPTIBLE_SCOPE or
# # IN_UNINTERRUPTIBLE_SCOPE_WITH_PENDING_THREADSWITCH, either of which prevents thread switching.
# 2) is_mailop_ready_to_fire() mailop fns return an integer priority.
# =======================
# This is 0 when not enabled,
# -1 for fixed priority and
# >0 for dynamic priority.
# The standard scheme is to associate a counter
# with the underlying mailop value and to increase
# it by one each time we try to fire the mailop.
#
#
# 3) fire_mailop() mailop fns should NOT end the uninterruptible scope.
# ===========
#
#
# 4) suspend_then_eventually_fire_mailop() mailop fns are responsible
# ========================
# for ending the uninterruptible scope.
#
# They are also responsible for executing the "finish_do1mailoprun"
# action prior to exiting the uninterruptible scope.
# Compiled by:
#
src/lib/std/standard.lib### "Another roof, another proof."
###
### -- Paul Erdos
stipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from
src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkgherein
package mailop: (weak)
api {
include api Mailop; # Mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api #
set_condvar__iu: itt::Condition_Variable -> Void;
wait_on_condvar': itt::Condition_Variable -> Mailop( Void ); # Exported for use in
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg }
{
call_with_current_control_fate = fat::call_with_current_control_fate;
switch_to_control_fate = fat::switch_to_control_fate;
call_with_current_fate = fat::call_with_current_fate;
# switch_to_fate = fat::switch_to_fate;
# Some inline functions
# to improve performance:
#
fun map f
=
map'
where
fun map' [] => [];
map' [a] => [f a];
map' [a, b] => [f a, f b];
map' [a, b, c] => [f a, f b, f c];
map' (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (map' r);
end;
end;
#
fun apply f
=
apply'
where
fun apply' [] => ();
apply' (x ! r) => { f x; apply' r;};
end;
end;
#
fun fold_forward f init l
=
foldf (l, init)
where
fun foldf ([], accum) => accum;
foldf (x ! r, accum) => foldf (r, f (x, accum));
end;
end;
#
fun error msg
=
raise exception DIE msg;
fun length l
=
loop (0, l)
where
fun loop (n, []) => n;
loop (n, _ ! l) => loop (n + 1, l);
end;
end;
Mailop_Readiness == itt::Mailop_Readiness;
Mailop == itt::Mailop;
Base_Mailop(X) = itt::Base_Mailop(X);
Run_Gun = Mailop(Void);
End_Gun = Mailop(Void);
# Condition variables.
#
# Because these variables are set inside
# atomic regions we have to use different
# conventions for clean-up, etc. Instead
# of requiring the suspend_then_eventually_fire_mailop fate
# to call the finish_do1mailoprun fn and to end
# the uninterruptible scope, we call the finish_do1mailoprun
# function when setting the condition variable
# (in set_condvar__iu), and have the invariant
# that the suspend_then_eventually_fire_mailop fate is dispatched
# outside the atomic region.
# Nomenclature: What I'm calling "uninterruptible_scope" is usually called "critical section" or "atomic region"
# in the literature. I dislike "critical" because it is vague. ("critical" in what sense? Who knows?)
# "atomic" is literally correct ("a-tomic" == "not cuttable" -- indivisible) but the modern reader is not
# likely to take it in that sense at first blush. And neither "section" nor "region" are as apropos as "scope".
# Set a condition variable.
# Caller guarantees that this function is always
# executed in an uninterruptible scope.
#
set_condvar__iu
=
mps::set_condvar__iu;
# fun set_condvar__iu (itt::CONDITION_VARIABLE state) # Only external call is in
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg# =
# case *state
# #
# itt::CONDVAR_IS_NOT_SET waiting_threads # waiting_threads is the list of threads sitting
# => # blocked waiting for this condvar to be set.
# { mps::foreground_run_queue -> rwq::RW_QUEUE { back, ... };
# #
# state := itt::CONDVAR_IS_SET 1; # Set the condition variable. NB: The '1' is a priority, not its (non-existent) value.
# #
# back := run waiting_threads # Add to foreground run queue all threads that were waiting for condvar to be set.
# where
# fun run [] => *back;
# #
# run ( { do1mailoprun_status=>REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest)
# =>
# run rest; # Drop completed do1mailoprun.
#
# run ( { do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), finish_do1mailoprun, fate } ! rest)
# =>
# { do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
# #
# finish_do1mailoprun (); # Do stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks.
#
# (thread, fate) ! (run rest);
# };
# end;
# end;
# };
#
# _ => error "condvar already set";
# esac;
# The mailop constructor for
# waiting on a condition variable:
#
fun wait_on_condvar' (itt::CONDITION_VARIABLE condvar_state) # This fn is used (only) below and in
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg =
BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Does stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks.
return_to__suspend_then_eventually_fire_mailops__loop
}
=
call_with_current_fate
#
(\\ fate
=
{ waiting_threads # The list of threads waiting for the condvar to be set.
=
case *condvar_state
#
itt::CONDVAR_IS_NOT_SET waiting_threads => waiting_threads;
itt::CONDVAR_IS_SET => raise exception DIE "Bug in wait_on_condvar'";
esac; # Above exception should not happen: is_mailop_ready_to_fire() only queues us up if *condvar_state is not CONDVAR_IS_SET.
#
waiting_thread = { do1mailoprun_status, finish_do1mailoprun, fate };
condvar_state := itt::CONDVAR_IS_NOT_SET (waiting_thread ! waiting_threads); # Add ourself to list of threads waiting for condvar to be set.
return_to__suspend_then_eventually_fire_mailops__loop (); # Does not return.
}
);
#
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *condvar_state
#
itt::CONDVAR_IS_SET
=>
{ READY_MAILOP { fire_mailop }
where
fun fire_mailop () # Reppy refers to fire_mailop as 'doFn'.
=
log::uninterruptible_scope_mutex := 0; # End uninterruptible scope.
end;
};
itt::CONDVAR_IS_NOT_SET _
=>
UNREADY_MAILOP suspend_then_eventually_fire_mailop;
esac;
end;
# A mailop which is always ready to fire
# and produces given result:
#
fun always' result # This is used a lot in (for example)
src/lib/std/src/socket/socket.pkg =
BASE_MAILOPS
[
\\ () = itt::READY_MAILOP
{ fire_mailop => \\ () = { log::uninterruptible_scope_mutex := 0; # Reppy refers to fire_mailop as 'doFn'.
result;
}
}
];
# A mailop which is never ready to fire:
#
never' = BASE_MAILOPS []; # Used in:
src/lib/x-kit/widget/old/basic/hostwindow.pkg #
src/lib/x-kit/xclient/src/window/widget-cable-old.pkg
# These generate mailops on-the-fly while 'do_one_mailop' is running.
# The second is given a mailop with which to detect client
# abort of the generated mailop:
#
dynamic_mailop = DYNAMIC_MAILOP;
dynamic_mailop_with_nack = DYNAMIC_MAILOP_WITH_NACK; # This is mainly used in:
src/lib/std/src/io/winix-text-file-for-os-g.pkg # Also used in:
src/lib/std/src/io/winix-mailslot-io-g.pkg #
src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg #
# Combine a list of mailops into a single mailop:
#
fun cat_mailops (mailops: List( Mailop(X) )) # This gets called in (for example):
src/lib/std/src/io/winix-mailslot-io-g.pkg = # A frequent idiom is block_until_mailop_fires (cat_mailops mailops);
gather (reverse mailops, []) #
where
fun gather ([], results) => BASE_MAILOPS results; # Done, return results.
#
gather (BASE_MAILOPS [] ! rest, results) => gather (rest, results);
gather (BASE_MAILOPS [mailop] ! rest, results) => gather (rest, mailop ! results);
gather (BASE_MAILOPS mailops ! rest, results) => gather (rest, mailops @ results);
#
gather (mailops, []) => gather' (mailops, []);
gather (mailops, l ) => gather' (mailops, [BASE_MAILOPS l]);
end
also
fun gather' ([], [mailop]) => mailop;
gather' ([], mailops) => CHOOSE_MAILOP mailops;
#
gather' (CHOOSE_MAILOP mailops ! rest, mailops')
=>
gather' (rest, mailops @ mailops');
gather' (BASE_MAILOPS base_mailops ! rest, BASE_MAILOPS base_mailops' ! rest')
=>
gather' (rest, BASE_MAILOPS (base_mailops @ base_mailops') ! rest');
gather' (mailop ! rest, mailops')
=>
gather' (rest, mailop ! mailops');
end;
end;
fun if_then' (mailop, added_action)
=
# Here we implement the "==>" op used in do_one_mailop [...] rules.
# This op takes two arguments
#
# mailop: Mailop(X)
# added_action: X -> Y
#
# and from them constructs a new mailop of type
#
# Mailop(Y)
#
# which does exactly what the original mailop did,
# except that afterwards it also does added_action.
#
# Recall that (suppressing a few details) a (base) Mailop # See
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg # is essentially a function is_mailop_ready_to_fire:
#
# Void -> ( READY_MAILOP { fire_mailop: Void -> X }
#
| UNREADY_MAILOP ({...} -> X)
# )
#
# The fn that actually does the useful work here is
#
# fire_mailop
#
# -- everything else is just bookkeeping etc -- and
# our job here is basically to replace it by
#
# added_action o fire_mailop
#
# Everything else in this fn is just the busywork of
# iterating over the expression:
#
wrap' mailop
where
fun wrap_base_mailop is_mailop_ready_to_fire () # Note that a mailop *is* an is_mailop_ready_to_fire fn. We hide that externally as an implementation detail, but it becomes visible at this level.
= # Note also that 'wrap_base_mailop' is CURRIED -- our caller does not immediate supply our () arg, so we *initially* return a thunk that will
case (is_mailop_ready_to_fire ()) # *eventually* evaluate is_mailop_ready_to_fire() -- we do not do so initially. The returned thunk is a new mailop wrapping the old mailop.
#
READY_MAILOP { fire_mailop } => READY_MAILOP { fire_mailop => added_action o fire_mailop }; # The new fire_mailop value here is what this fn is all about.
UNREADY_MAILOP suspend_then_eventually_fire_mailop => UNREADY_MAILOP (added_action o suspend_then_eventually_fire_mailop); # Same as above in slightly different setting.
esac;
#
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops); # Iterate through the base mailops doing the above to them.
#
wrap' (CHOOSE_MAILOP mailops) => CHOOSE_MAILOP (map wrap' mailops); # Iterate through the compound mailops looking for work.
#
wrap' (DYNAMIC_MAILOP make_mailop) => DYNAMIC_MAILOP (\\ () = if_then' (make_mailop(), added_action)); # Same core substitution in setting of dynamic mailops.
wrap' (DYNAMIC_MAILOP_WITH_NACK f) => DYNAMIC_MAILOP_WITH_NACK (\\ mailop = if_then' (f mailop, added_action)); # Same core substitution in setting of dynamic mailops with nacks.
end;
end;
(==>) = if_then'; # Infix synonym for readability.
#
fun make_exception_handling_mailop (mailop, exception_handler_fn)
=
wrap' mailop
where
fun wrap f x
=
f x
except
exn = exception_handler_fn exn;
#
fun wrap_base_mailop is_mailop_ready_to_fire ()
=
case (is_mailop_ready_to_fire ())
#
READY_MAILOP { fire_mailop } => READY_MAILOP { fire_mailop => wrap fire_mailop };
UNREADY_MAILOP suspend_then_eventually_fire_mailop => UNREADY_MAILOP (wrap suspend_then_eventually_fire_mailop);
esac;
#
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops);
#
wrap' (CHOOSE_MAILOP mailops) => CHOOSE_MAILOP (map wrap' mailops);
#
wrap' (DYNAMIC_MAILOP make_mailop) => DYNAMIC_MAILOP (\\ () = make_exception_handling_mailop (make_mailop(), exception_handler_fn));
wrap' (DYNAMIC_MAILOP_WITH_NACK f) => DYNAMIC_MAILOP_WITH_NACK (\\ mailop = make_exception_handling_mailop (f mailop, exception_handler_fn));
end;
end;
Scanned_Mailops(X)
= NACKFREE_MAILOPS List( Base_Mailop(X) )
| NACKFULL_MAILOPS List( Scanned_Mailops(X) )
| WITHNACK_MAILOP (itt::Condition_Variable, Scanned_Mailops(X))
;
# +DEBUG
# fun sayGroup (msg, eg) = let
# fun f (NACKFREE_MAILOPS l, sl) = "NACKFREE_MAILOPS(" ! int::to_string (list::length l) ::()) ! sl
#
| f (NACKFULL_MAILOPS l, sl) = "NACKFULL_MAILOPS(" ! g (l, ")" ! sl)
#
| f (WITHNACK_MAILOP l, sl) = "WITHNACK_MAILOP(" ! f(
#2 l, ")" ! sl)
# also g ([], sl) = sl
#
| g ([x], sl) = f (x, sl)
#
| g (x ! r, sl) = f (x, ", " ! g (r, sl))
# in
# Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
# end
# -DEBUG
# Scan mailop expression to run.
# In particular, this evaluates all
# dynamic rules to generate the actual
# final rules to select between:
#
fun scan (BASE_MAILOPS l)
=>
NACKFREE_MAILOPS l;
scan mailop
=>
scan' mailop
where
fun scan' (DYNAMIC_MAILOP make_mailop)
=>
scan' (make_mailop ());
scan' (DYNAMIC_MAILOP_WITH_NACK f)
=>
{ condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
#
WITHNACK_MAILOP (condvar, scan' (f (wait_on_condvar' condvar)));
};
scan' (BASE_MAILOPS mailops)
=>
NACKFREE_MAILOPS mailops;
scan' (CHOOSE_MAILOP mailops)
=>
scan_mailops (mailops, []) # Optimistically assume nackfree; we'll fall back to scan_nackfull_mailops() if we're wrong.
where
fun scan_mailops ([], mailops)
=>
NACKFREE_MAILOPS mailops;
scan_mailops (mailop ! rest, mailops')
=>
case (scan' mailop)
#
NACKFREE_MAILOPS mailops => scan_mailops (rest, mailops @ mailops' );
NACKFULL_MAILOPS mailops => scan_nackfull_mailops (rest, mailops @ [NACKFREE_MAILOPS mailops']);
/* */ mailops => scan_nackfull_mailops (rest, [mailops, NACKFREE_MAILOPS mailops']);
esac;
end
also
fun scan_nackfull_mailops ([], [group]) => group; # The general case vs scan_mailops above handling the nice simple (and common) case.
scan_nackfull_mailops ([], l) => NACKFULL_MAILOPS l;
#
scan_nackfull_mailops (mailop ! rest, l)
=>
case (scan' mailop, l)
#
(NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
=>
scan_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');
(NACKFULL_MAILOPS mailops, l) => scan_nackfull_mailops (rest, mailops @ l);
( mailops, l) => scan_nackfull_mailops (rest, mailops ! l); # Here 'mailops' can be NACKFREE_MAILOPS or WITHNACK_MAILOP.
esac;
end; end; end; end;end;
stipulate
#
count = REF 0; # Runs circularly around range 0..999,999
#
fun pick_fairly i # The point here is just to pick fairly among 'i' ready mailops,
= # so that every ready mailop gets a chance to fire reasonably soon.
{ j = *count;
#
if (j >= 1000000) count := 0; # No locking needed because we run on a single hostthread
else count := j+1; # and microthread pre-emption is done only at start of a fn.
fi;
int::rem (j, i);
};
herein
#
fun fairly_pick_mailop_to_fire [ fire_mailop ]
=>
fire_mailop; # Only one choice -- easy work!
fairly_pick_mailop_to_fire fire_mailop_fns
=>
list::nth (fire_mailop_fns, pick_fairly (length fire_mailop_fns));
end;
end;
#
fun make__do1mailoprun_status__and__finish_do1mailoprun ()
=
{ do1mailoprun_status = REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));
#
finish_do1mailoprun = {. do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE; };
{ do1mailoprun_status, finish_do1mailoprun };
};
stipulate
# When we have exactly one mailop in the do_one_mailop[...] we can use simple logic:
#
fun do_lone_mailop (is_mailop_ready_to_fire: Base_Mailop(X))
=
{
mps::assert_not_in_uninterruptible_scope "do_lone_mailop";
log::uninterruptible_scope_mutex := 1;
#
case (is_mailop_ready_to_fire ())
#
READY_MAILOP { fire_mailop } # Reppy refers to 'fire_mailop' as 'doFn'.
=>
fire_mailop ();
UNREADY_MAILOP suspend_then_eventually_fire_mailop
=>
{ (make__do1mailoprun_status__and__finish_do1mailoprun ())
->
{ do1mailoprun_status, finish_do1mailoprun };
#
suspend_then_eventually_fire_mailop
{ do1mailoprun_status,
finish_do1mailoprun,
return_to__suspend_then_eventually_fire_mailops__loop
=>
mps::dispatch_next_thread__xu__noreturn # Since we have only one mailop, we do not need to actually loop here.
};
};
esac;
};
Test_Mailops_For_Readiness_To_Fire__Result(X)
#
= NO_READY_MAILOPS { start_mailop_watch__fns: List (itt::Suspend_Then_Eventually_Fire_Mailop__Fn(X)) }
| READY_MAILOPS { fire_mailop_fns: List (Void -> X) }
;
herein
# This function handles the case of picking
# and firing one of a list of mailops thunks
# (without any negative acknowledgements).
#
# It also handles NEVER.
#
fun do_nackfree_mailops [] => mps::dispatch_next_thread__noreturn (); # 'do_one_mailop' with empty rule list: No mailop can ever fire,
# # so there is no way to ever make progress -- end of thread!
# # XXX BUGGO FIXME Should we call some thread_exit type fn here?
# # Or throw an exception or log something?
# # Or at LEAST set MICROTHREAD.state to non-itt::state::ALIVE?
# # Just hanging is going to be bloody mysterious to debug.
#
do_nackfree_mailops [mailop] => do_lone_mailop mailop; # This is the only call to do_lone_mailop().
do_nackfree_mailops mailops
=>
{
mps::assert_not_in_uninterruptible_scope "do_nackfree_mailops";
log::uninterruptible_scope_mutex := 1; # Start uninterruptible scope. (Aka "critical section", "atomic region" etc.)
#
case (test_mailops_for_readiness_to_fire (mailops, []))
#
READY_MAILOPS { fire_mailop_fns }
=>
{ mailop_to_fire = fairly_pick_mailop_to_fire fire_mailop_fns; # Pick a do_one_mailop[....] mailop to fire ...
mailop_to_fire (); # ... and then fire it.
};
NO_READY_MAILOPS { start_mailop_watch__fns }
=>
call_with_current_control_fate
#
(\\ fate = { run__suspend_then_eventually_fire_mailops__loop__noreturn start_mailop_watch__fns; error "[run__suspend_then_eventually_fire_mailops__loop__noreturn returned?!]"; }
where # Execution should never reach above.
switch_to_control_fate = switch_to_control_fate fate;
#
(make__do1mailoprun_status__and__finish_do1mailoprun ())
->
{ do1mailoprun_status, finish_do1mailoprun };
#
fun run__suspend_then_eventually_fire_mailops__loop__noreturn [] # We have now set up ready watches on all mailops so
=> # switch let next ready-to-run microthread run while
mps::dispatch_next_thread__xu__noreturn (); # we wait for one of these mailops to become ready to fire.
#
run__suspend_then_eventually_fire_mailops__loop__noreturn (suspend_then_eventually_fire_mailop ! remaining_mailops)
=>
switch_to_control_fate
#
(suspend_then_eventually_fire_mailop # Set up ready-watch for this particular mailop.
{ do1mailoprun_status,
finish_do1mailoprun,
return_to__suspend_then_eventually_fire_mailops__loop # maildrop.pkg, mailslot.pkg etc call this to return control to us.
=>
{. run__suspend_then_eventually_fire_mailops__loop__noreturn remaining_mailops; } # Set up ready watches on remaining mailops.
}
);
end;
end
);
esac;
}
where
#
fun test_mailops_for_readiness_to_fire (is_mailop_ready_to_fire ! rest, start_mailop_watch__fns) # In this loop we have not yet found any mailops ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
UNREADY_MAILOP suspend_then_eventually_fire_mailop
=>
test_mailops_for_readiness_to_fire (rest, suspend_then_eventually_fire_mailop ! start_mailop_watch__fns);
READY_MAILOP { fire_mailop }
=>
test_mailops_for_readiness_to_fire' (rest, [ fire_mailop ]);
esac;
test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns) # Done -- no ready-to-fire mailops found and no candidates left to check.
=>
NO_READY_MAILOPS { start_mailop_watch__fns }; # No mailops were ready to fire; return list of fns which
end # each start a watch on one mailop for readiness to fire.
also
fun test_mailops_for_readiness_to_fire' (is_mailop_ready_to_fire ! rest, fire_mailop_fns) # In this loop we have found at least one mailop which is ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
READY_MAILOP { fire_mailop }
=> test_mailops_for_readiness_to_fire' (rest, fire_mailop ! fire_mailop_fns);
_ => test_mailops_for_readiness_to_fire' (rest, fire_mailop_fns);
esac;
test_mailops_for_readiness_to_fire' ([], fire_mailop_fns)
=>
READY_MAILOPS { fire_mailop_fns }; # At least one mailop was ready to fire; return the fns which
# will fire the ready-to-fire mailops.
end; # fun do_ready_mailops
# NOTE: Maybe we should just keep
# track of the max priority above?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
end; # where
end; # fun do_nackfree_mailops
end; # stipulate
stipulate
# Walk the mailop group tree,
# collecting the base mailops
# (with associated ack flags),
# also a list of nackstates.
#
# A nackstate is a
# (Condvar, List(Flag(Ack)))
# pair, where the flags are
# those associated with the
# mailops covered by the nack
# condvar.
#
fun gather_base_mailops_and_nackstates mailops
=
case mailops
#
NACKFULL_MAILOPS _
=>
gather_mailops (mailops, [], [])
where
un_wrapped_flag = REF FALSE;
#
fun reverse_and_prepend_mailops (mailop ! rest, results) => reverse_and_prepend_mailops (rest, (mailop, un_wrapped_flag) ! results);
reverse_and_prepend_mailops ( [], results) => results;
end;
#
fun gather_mailops (NACKFREE_MAILOPS nackfree_mailops, bl, nackstates)
=>
(reverse_and_prepend_mailops (nackfree_mailops, bl), nackstates);
gather_mailops (NACKFULL_MAILOPS nackfull_mailops, bl, nackstates)
=>
fold_forward f (bl, nackstates) nackfull_mailops
where
fun f (group', (bl', nackstates'))
=
gather_mailops (group', bl', nackstates');
end;
gather_mailops (withnack as WITHNACK_MAILOP _, bl, nackstates)
=>
gather_wrapped (withnack, bl, nackstates);
end;
end;
group => gather_wrapped (group, [], []);
esac
where
fun gather_wrapped (group, bl, nackstates)
=
{ (gather (group, bl, [], nackstates))
->
(bl, _, nackstates);
(bl, nackstates);
}
where
fun gather (NACKFREE_MAILOPS mailops, bl, all_flags, nackstates)
=>
{ (reverse_and_prepend_mailops (mailops, bl, all_flags))
->
(bl', all_flags');
(bl', all_flags', nackstates);
}
where
fun reverse_and_prepend_mailops ([], bl, all_flags)
=>
(bl, all_flags);
reverse_and_prepend_mailops (mailop ! rest, bl, all_flags)
=>
{ flag = REF FALSE;
#
reverse_and_prepend_mailops (rest, (mailop, flag) ! bl, flag ! all_flags);
};
end;
end;
gather (NACKFULL_MAILOPS group, bl, all_flags, nackstates)
=>
fold_forward f (bl, all_flags, nackstates) group
where
fun f (group', (bl', all_flags', nackstates'))
=
gather (group', bl', all_flags', nackstates');
end;
gather (WITHNACK_MAILOP (condvar, group), bl, all_flags, nackstates)
=>
{ (gather (group, bl, [], nackstates))
->
(bl', all_flags', nackstates');
( bl',
all_flags' @ all_flags,
(condvar, all_flags') ! nackstates'
);
};
end; # fun gather
end; # where
end; # where
Test_Mailops_For_Readiness_To_Fire__Result(X)
#
= NO_READY_MAILOPS { start_mailop_watch__fns: List ((itt::Suspend_Then_Eventually_Fire_Mailop__Fn(X), Ref(Bool))) }
| READY_MAILOPS { fire_mailop_fns: List ( ((Void -> X), Ref(Bool)) ) }
;
herein
# This function handles the more
# complicated case of running a
# mailop expression where negative
# acknowledgements are involved.
#
fun do_nackfull_mailops group
=
{
mps::assert_not_in_uninterruptible_scope "do_nackfull_mailops";
log::uninterruptible_scope_mutex := 1; # Start uninterruptible scope (aka "critical section" aka "atomic region"...)
#
case (test_mailops_for_readiness_to_fire (mailops, []))
#
READY_MAILOPS { fire_mailop_fns }
=>
{ (fairly_pick_mailop_to_fire fire_mailop_fns)
->
(fire_mailop, flag);
flag := TRUE;
send_nacks_as_appropriate ();
fire_mailop ();
};
NO_READY_MAILOPS { start_mailop_watch__fns }
=>
call_with_current_control_fate
(\\ fate
=
{ run__suspend_then_eventually_fire_mailops__loop__noreturn start_mailop_watch__fns; # Ask each mailop in the do_one_mailop [...] list to enqueue itself as appropriate.
#
error "[run__suspend_then_eventually_fire_mailops__loop__noreturn returned?!]"; # This cannot happen -- run__suspend_then_eventually_fire_mailops__loop__noreturn() runs next thread when done.
}
where
switch_to_control_fate = switch_to_control_fate fate;
#
do1mailoprun_status = REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread ())); # 'do1mailoprun_status' is basically a mutex ensuring exactly one mailop fires per do1mailoprun.
#
fun finish_do1mailoprun_fn flag () # This will be called by the first mailop to fire.
=
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
flag := TRUE;
send_nacks_as_appropriate ();
};
#
fun run__suspend_then_eventually_fire_mailops__loop__noreturn []
=>
mps::dispatch_next_thread__xu__noreturn ();
#
run__suspend_then_eventually_fire_mailops__loop__noreturn ((suspend_then_eventually_fire_mailop, flag) ! rest)
=>
switch_to_control_fate
(
suspend_then_eventually_fire_mailop
{
do1mailoprun_status,
finish_do1mailoprun => finish_do1mailoprun_fn flag,
return_to__suspend_then_eventually_fire_mailops__loop # maildrop.pkg, mailslot.pkg etc call this to return control to us.
=>
\\ () = run__suspend_then_eventually_fire_mailops__loop__noreturn rest
}
);
end; # fun run__suspend_then_eventually_fire_mailops__loop__noreturn
end # where
);
esac;
}
where
(gather_base_mailops_and_nackstates group)
->
(mailops, nackstates);
#
fun send_nacks_as_appropriate ()
=
apply check_condvar nackstates # NB: We capture 'nackstates' here for later use in 'finish_do1mailoprun_fn'.
where
# check_condvar checks the flags of a nackstate.
# If they are all FALSE then the
# corresponding condvar is set to signal
# the negative ack.
#
fun check_condvar (condvar, flags)
=
check_flags flags
where
fun check_flags ((REF TRUE) ! _) => ();
check_flags (_ ! rest) => check_flags rest;
check_flags [] => set_condvar__iu condvar;
end;
end;
end;
#
fun test_mailops_for_readiness_to_fire ((is_mailop_ready_to_fire, flag) ! rest, start_mailop_watch__fns) # In this loop we have not yet found a mailop ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
READY_MAILOP { fire_mailop } => test_mailops_for_readiness_to_fire' (rest, [ (fire_mailop, flag)]);
UNREADY_MAILOP suspend_then_eventually_fire_mailop => test_mailops_for_readiness_to_fire (rest, (suspend_then_eventually_fire_mailop, flag) ! start_mailop_watch__fns);
esac;
test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns) # Done -- no ready-to-fire mailops found and none left to check,
=> # so we must block until some mailop becomes ready to fire.
NO_READY_MAILOPS { start_mailop_watch__fns };
end
also
fun test_mailops_for_readiness_to_fire' ((is_mailop_ready_to_fire, flag) ! rest, fire_mailop_fns) # In this loop we have found at least one do_one_mailop[...] mailop ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
READY_MAILOP { fire_mailop }
=>
test_mailops_for_readiness_to_fire' (rest, (fire_mailop, flag) ! fire_mailop_fns);
_ => test_mailops_for_readiness_to_fire' (rest, fire_mailop_fns);
esac;
test_mailops_for_readiness_to_fire' ([], fire_mailop_fns) # Done -- pick one do_one_mailop[...] mailop to fire and then fire it.
=>
READY_MAILOPS { fire_mailop_fns };
end;
# NOTE: Maybe above we should just
# keep track of the max priority?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
#
end; # fun do_nackfull_mailops
end; # stipulate
#
fun block_until_mailop_fires mailop # PUBLIC.
=
case (scan mailop)
#
NACKFREE_MAILOPS mailops => do_nackfree_mailops mailops;
/* */ mailops => do_nackfull_mailops mailops;
esac;
# 'do_one_mailop' is our core entrypoint, the 'do_one_mailop' used by clients
# to do handle-multiple-mail-sources style thread I/O via
# statements looking like
#
# do_one_mailop [
# foo' ==> {. do_this (); },
# bar' ==> {. do_that (); },
# ...
# ];
#
# We have two main cases:
# 1) If one or more mailops in the list is ready to fire, we pick one and fire it.
# 2) If no mailop in the list is ready to fire, we must block until one becomes ready, then continue as in 1).
fun do_one_mailop mailops # PUBLIC.
=
case (scan_mailops (mailops, []))
#
NACKFREE_MAILOPS mailops => do_nackfree_mailops mailops; # This is special-case handling for simple special case of no WITH_NACK mailops.
/* */ mailops => do_nackfull_mailops mailops; # This is the general case.
esac
where
# Preparation. During this phase we need to:
#
# o Expand DYNAMIC_MAILOP # These are essentially hacks to allow generating
# and DYNAMIC_MAILOP_WITH_NACK clauses. # mailops on the fly while 'do_one_mailop' is running.
#
# o Figure out whether we have any WITH_NACK clauses,
# which complicate things.
#
# In the common case of no WITH_NACK mailops
# we return NACKFREE_MAILOPS _, otherwise
# we return NACKFULL_MAILOPS _ or WITHNACK_MAILOP _.
#
fun scan_mailops (mailop ! rest, mailops) # scan_mailops handles the nice simple nack-free case;
=> # scan_nackfull_mailops handles the case where one or more WITH_NACK clauses are present.
case (scan_one_mailop mailop)
#
/* */ NACKFREE_MAILOPS mailops' => scan_mailops (rest, mailops' @ mailops );
/* */ NACKFULL_MAILOPS mailops' => scan_nackfull_mailops (rest, mailops' @ [ NACKFREE_MAILOPS mailops ]);
wm as WITHNACK_MAILOP _ => scan_nackfull_mailops (rest, [ wm, NACKFREE_MAILOPS mailops ]);
esac;
scan_mailops ([], mailops) => NACKFREE_MAILOPS mailops; # Done!
end
also
fun scan_nackfull_mailops ([], [result]) => result;
scan_nackfull_mailops ([], results) => NACKFULL_MAILOPS results;
#
scan_nackfull_mailops (mailop ! rest, results)
=>
case (scan_one_mailop mailop, results)
#
(NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
=>
scan_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');
(NACKFULL_MAILOPS mailops, results) => scan_nackfull_mailops (rest, mailops @ results);
(other, results) => scan_nackfull_mailops (rest, other ! results); # 'other' can be (NACKFREE_MAILOPS _) or (WITHNACK_MAILOP _).
esac;
end
also
fun scan_one_mailop (BASE_MAILOPS mailops) => NACKFREE_MAILOPS mailops;
scan_one_mailop (CHOOSE_MAILOP mailops) => scan_mailops (mailops, []);
scan_one_mailop (DYNAMIC_MAILOP make_mailop) => scan_one_mailop (make_mailop ()); # Generate a do_one_mailop[...] mailop on the fly, then recursively scan it.
#
scan_one_mailop (DYNAMIC_MAILOP_WITH_NACK make_mailop) # Like as DYNAMIC_MAILOP but with a nack mailop to signal client-code abort.
=>
{ condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
#
WITHNACK_MAILOP (condvar, scan_one_mailop (make_mailop (wait_on_condvar' condvar)));
};
end;
end; # fun do_one_mailop
Replyqueue_Entry # See Note[1] below.
=
{ id: Int, # The 'id' field is so that we can unambiguously remove a request from a request queue -- Mailops are not an equality type.
op: Mailop(Void) # This will be the oneshot for the reply from another imp, together with our logic for handling that reply.
};
Replyqueue
=
{ next_id: Ref(Int), # Used to generate Request.id values.
queue: Ref( List( Replyqueue_Entry ) ) # The request queue proper, a list of reply oneshots wrapped in handling logic.
};
fun make_replyqueue ()
=
{ next_id => REF 1,
queue => REF []
};
fun put_in_replyqueue # Remember that we're expecting a reply to a request made of an external imp.
(
q: Replyqueue, # This is our imp-private reply queue.
op: Mailop(Void) # This is the mailop which will fire when our reply arrives.
)
=
{ id = *q.next_id; q.next_id := id + 1; # Allocate a fresh id for the nascent replyqueue entry.
#
op = (op ==> (\\ x = { drop_from_replyqueue { q, id }; x; })); # Wrap given mailop so that when it fires it will delete its replyqueue entry.
q.queue := { id, op } ! *q.queue; # Add entry to replyqueue.
}
where
fun drop_from_replyqueue { q: Replyqueue, id: Int } # Drop from replyqueue the entry with id==my_id.
=
q.queue := drop_it (*q.queue, [])
where
fun drop_it ((queue_entry as { id => id', op }) ! rest, result) # Copy given list, dropping id' from it.
=>
if (id' == id) drop_it (rest, result); # This is the queue entry to drop -- ignore it, but copy rest of list.
else drop_it (rest, queue_entry ! result); # Not our queue entry -- copy it to result list.
fi;
drop_it ([], result) => result; # Yes, result is reversed relative to input list. We don't care.
end;
end;
end;
fun do_one_mailop' (q: Replyqueue) mailops # Just like do_one_mailop, except it also processes the replyqueue of pending replies from other imps.
=
do_one_mailop (prepend_replyqueue_mailops (*q.queue, mailops)) # Augment the mailops list and then pass the buck to the vanilla do_one_mailop().
where
fun prepend_replyqueue_mailops ([], result) # Prepend replyqueue contents to 'mailops' and return result.
=>
result;
prepend_replyqueue_mailops ({ id, op } ! rest, result) #
=>
prepend_replyqueue_mailops (rest, op ! result); #
end;
end;
fun replyqueue_to_string ({ next_id, queue => REF q }, name)
=
sprintf "{
|RQ|:%s =%d}" name (length q);
}; # package mailop
end;
##############################################################################################
# Note[1]
#
# To avoid deadlock, it is critically important that each imp
# write to message queues (a write to a message queue cannot block)
# and read via a single call (ensuring it cannot be blocked on one
# read call while another has input ready to process).
#
# Obtaining a value from another imp is done by putting it in a
# request queue together with a oneshot maildrop for the reply.
# Our imp then needs to keep a list of such oneshot-maildrops
# and scan it as part of our mailop input statement (do_one_mailop []).
# Any oneshot which fires during such a read statement needs to
# be dropped from the request queue, since it will never fire again
# and thus is just clutter and a potential memory leak.
#
# Note that since a refcell is equal to itself and nothing else,
# we could use Ref(Void) refcells for the id type, thus avoiding
# the need for next_id and the risk of Int wrap-around. I'm using
# Int anyhow because it simplifies debugging by allowing logging
# of more readable traces.
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.