


## mailop.pkg
#
# Implementation of mailop values and the mailop combinators.
#
# Some important requirements on the implementation
# of base mailop values:
#
# 1) An is_mailop_ready_to_fire(), fire_mailop()
# or set_up_mailopready_watch() fn
# is always called from inside an uninterruptible scope. # "uninterruptible scope" == "critical section" == "atomic region" == ...
# # In practice it means that microthread_preemptive_scheduler::thread_scheduler_state is either IN_UNINTERRUPTIBLE_SCOPE or
# # IN_UNINTERRUPTIBLE_SCOPE_WITH_PENDING_THREADSWITCH, either of which prevents thread switching.
# 2) An is_mailop_ready_to_fire() returns an integer priority.
# This is 0 when not enabled,
# -1 for fixed priority and
# >0 for dynamic priority.
# The standard scheme is to associate a counter
# with the underlying synchronization value and
# to increase it by one for each synchronization attempt.
#
# 3) A set_up_mailopready_watch fn is responsible
# for ending the uninterruptible scope.
#
# A fire_mailop should NOT end the uninterruptible scope.
#
# 4) Each set_up_mailopready_watch fn is responsible for executing
# the "finish_do1mailoprun" action prior to ending the uninterruptible scope.
# Compiled by:
# src/lib/std/standard.lib### "Another roof, another proof."
###
### -- Paul Erdos
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkgherein
package mailop: (weak)
api {
include Mailop; # Mailop is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api #
set_condvar__iu: itt::Condition_Variable -> Void;
wait_on_condvar': itt::Condition_Variable -> Mailop( Void ); # Exported for use in src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg }
{
call_with_current_control_fate = fat::call_with_current_control_fate;
switch_to_control_fate = fat::switch_to_control_fate;
call_with_current_fate = fat::call_with_current_fate;
# switch_to_fate = fat::switch_to_fate;
# Some inline functions
# to improve performance:
#
fun map f
=
map'
where
fun map' [] => [];
map' [a] => [f a];
map' [a, b] => [f a, f b];
map' [a, b, c] => [f a, f b, f c];
map' (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (map' r);
end;
end;
#
fun apply f
=
apply'
where
fun apply' [] => ();
apply' (x ! r) => { f x; apply' r;};
end;
end;
#
fun fold_forward f init l
=
foldf (l, init)
where
fun foldf ([], accum) => accum;
foldf (x ! r, accum) => foldf (r, f (x, accum));
end;
end;
#
fun error msg
=
raise exception FAIL msg;
Mailop_Readiness == itt::Mailop_Readiness;
Mailop == itt::Mailop;
Base_Mailop(X) = itt::Base_Mailop(X);
# Condition variables.
#
# Because these variables are set inside
# atomic regions we have to use different
# conventions for clean-up, etc. Instead
# of requiring the set_up_mailopready_watch fate
# to call the finish_do1mailoprun fn and to end
# the uninterruptible scope, we call the finish_do1mailoprun
# function when setting the condition variable
# (in set_condvar__iu), and have the invariant
# that the set_up_mailopready_watch fate is dispatched
# outside the atomic region.
# Nomenclature: What I'm calling "uninterruptible_scope" is usually called "critical section" or "atomic region"
# in the literature. I dislike "critical" because it is vague. ("critical" in what sense? Who knows?)
# "atomic" is literally correct ("a-tomic" == "not cuttable" -- indivisible) but the modern reader is not
# likely to take it in that sense at first blush. And neither "section" nor "region" are as apropos as "scope".
# Set a condition variable.
# Caller guarantees that this function is always
# executed in an uninterruptible scope.
#
set_condvar__iu
=
mps::set_condvar__iu;
# fun set_condvar__iu (itt::CONDITION_VARIABLE state) # Only external call is in src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg# =
# case *state
# #
# itt::CONDVAR_IS_NOT_SET waiting_threads # waiting_threads is the list of threads sitting
# => # blocked waiting for this condvar to be set.
# { mps::foreground_run_queue -> rwq::RW_QUEUE { back, ... };
# #
# state := itt::CONDVAR_IS_SET 1; # Set the condition variable. NB: The '1' is a priority, not its (non-existent) value.
# #
# back := run waiting_threads # Add to foreground run queue all threads that were waiting for condvar to be set.
# where
# fun run [] => *back;
# #
# run ( { do1mailoprun_status=>REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest)
# =>
# run rest; # Drop completed do1mailoprun.
#
# run ( { do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), finish_do1mailoprun, fate } ! rest)
# =>
# { do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
# #
# finish_do1mailoprun (); # Do stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks.
#
# (thread, fate) ! (run rest);
# };
# end;
# end;
# };
#
# _ => error "condvar already set";
# esac;
# The mailop constructor for
# waiting on a condition variable:
#
fun wait_on_condvar' (itt::CONDITION_VARIABLE condvar_state) # This fn is used (only) below and in src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg =
BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun set_up_mailopready_watch # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Does stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks.
return_to__set_up_mailopready_watches__loop
}
=
call_with_current_fate
#
(fn fate
=
{ waiting_threads # The list of threads waiting for the condvar to be set.
=
case *condvar_state
#
itt::CONDVAR_IS_NOT_SET waiting_threads => waiting_threads;
itt::CONDVAR_IS_SET _ => raise exception FAIL "Bug in wait_on_condvar'";
esac; # Above exception should not happen: is_mailop_ready_to_fire() only queues us up if *condvar_state is not CONDVAR_IS_SET.
#
waiting_thread = { do1mailoprun_status, finish_do1mailoprun, fate };
condvar_state := itt::CONDVAR_IS_NOT_SET (waiting_thread ! waiting_threads); # Add ourself to list of threads waiting for condvar to be set.
return_to__set_up_mailopready_watches__loop (); # Does not return.
}
);
#
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *condvar_state
#
itt::CONDVAR_IS_SET priority
=>
{ condvar_state := itt::CONDVAR_IS_SET (priority+1); # Increment the condition-variable priority.
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }
where
fun fire_mailop () # Reppy refers to fire_mailop as 'doFn'.
=
{ condvar_state := itt::CONDVAR_IS_SET 1; # Set condvar priority back to minimum.
#
log::uninterruptible_scope_mutex := 0; # End uninterruptible scope.
};
end;
};
itt::CONDVAR_IS_NOT_SET _
=>
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
esac;
end;
# A mailop which is always ready to fire
# and produces given result:
#
fun always' result # This is used a lot in (for example) src/lib/std/src/socket/socket.pkg =
BASE_MAILOPS
[
fn () = itt::MAILOP_IS_READY_TO_FIRE
{ priority => -1,
fire_mailop => fn () = { log::uninterruptible_scope_mutex := 0; # Reppy refers to fire_mailop as 'doFn'.
result;
}
}
];
# A mailop which is never ready to fire:
#
never' = BASE_MAILOPS []; # Used in: src/lib/x-kit/widget/basic/topwindow.pkg # src/lib/x-kit/xclient/src/window/widget-cable.pkg
# These generate mailops on-the-fly while 'do_one_mailop' is running.
# The second is given a mailop with which to detect client
# abort of the generated mailop:
#
dynamic_mailop = DYNAMIC_MAILOP;
dynamic_mailop_with_nack = DYNAMIC_MAILOP_WITH_NACK; # This is mainly used in: src/lib/std/src/io/winix-text-file-for-os-g.pkg # Also used in: src/lib/std/src/io/winix-mailslot-io-g.pkg # src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg #
# Combine a list of mailops into a single mailop:
#
fun cat_mailops (mailops: List( Mailop(X) )) # This gets called in (for example): src/lib/std/src/io/winix-mailslot-io-g.pkg = # A frequent idiom is block_until_mailop_fires (cat_mailops mailops);
gather (reverse mailops, []) #
where
fun gather ([], results) => BASE_MAILOPS results; # Done, return results.
#
gather (BASE_MAILOPS [] ! rest, results) => gather (rest, results);
gather (BASE_MAILOPS [mailop] ! rest, results) => gather (rest, mailop ! results);
gather (BASE_MAILOPS mailops ! rest, results) => gather (rest, mailops @ results);
#
gather (mailops, []) => gather' (mailops, []);
gather (mailops, l ) => gather' (mailops, [BASE_MAILOPS l]);
end
also
fun gather' ([], [mailop]) => mailop;
gather' ([], mailops) => CHOOSE_MAILOP mailops;
#
gather' (CHOOSE_MAILOP mailops ! rest, mailops')
=>
gather' (rest, mailops @ mailops');
gather' (BASE_MAILOPS base_mailops ! rest, BASE_MAILOPS base_mailops' ! rest')
=>
gather' (rest, BASE_MAILOPS (base_mailops @ base_mailops') ! rest');
gather' (mailop ! rest, mailops')
=>
gather' (rest, mailop ! mailops');
end;
end;
fun make_compound_mailop (mailop, added_action)
=
# Here we implement the "==>" op used in do_one_mailop [...] rules.
# This op takes two arguments
#
# mailop: Mailop(X)
# added_action: X -> Y
#
# and from them constructs a new mailop of type
#
# Mailop(Y)
#
# which does exactly what the original mailop did,
# except that afterwards it also does added_action.
#
# Recall that (suppressing a few details) a (base) Mailop # See src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg # is essentially a function
#
# Void -> ( MAILOP_IS_READY_TO_FIRE { fire_mailop: Void -> X }
# | MAILOP_IS_NOT_READY_TO_FIRE ({...} -> X)
# )
#
# The fn that actually does the useful work here is
#
# fire_mailop
#
# -- everything else is just bookkeeping etc -- and
# our job here is basically to replace it by
#
# added_action o fire_mailop
#
# Everything else in this fn is just the busywork of
# iterating over the expression:
#
wrap' mailop
where
fun wrap_base_mailop is_mailop_ready_to_fire () # Note that a mailop *is* an is_mailop_ready_to_fire fn. We hide that externally as an implementation detail, but it becomes visible at this level.
= # Note also that 'wrap_base_mailop' is CURRIED -- our caller does not immediate supply our () arg, so we *initially* return a thunk that will
case (is_mailop_ready_to_fire ()) # *eventually* evaluate is_mailop_ready_to_fire() -- we do not do so initially. The returned thunk is a new mailop wrapping the old mailop.
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } => MAILOP_IS_READY_TO_FIRE { priority, fire_mailop => added_action o fire_mailop }; # The new fire_mailop value here is what this fn is all about.
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch => MAILOP_IS_NOT_READY_TO_FIRE (added_action o set_up_mailopready_watch); # Same as above in slightly different setting.
esac;
#
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops); # Iterate through the base mailops doing the above to them.
#
wrap' (CHOOSE_MAILOP mailops) => CHOOSE_MAILOP (map wrap' mailops); # Iterate through the compound mailops looking for work.
#
wrap' (DYNAMIC_MAILOP make_mailop) => DYNAMIC_MAILOP (fn () = make_compound_mailop (make_mailop(), added_action)); # Same core substitution in setting of dynamic mailops.
wrap' (DYNAMIC_MAILOP_WITH_NACK f) => DYNAMIC_MAILOP_WITH_NACK (fn mailop = make_compound_mailop (f mailop, added_action)); # Same core substitution in setting of dynamic mailops with nacks.
end;
end;
(==>) = make_compound_mailop; # Infix synonym for readability.
#
fun make_exception_handling_mailop (mailop, exception_handler_fn)
=
wrap' mailop
where
fun wrap f x
=
f x
except
exn = exception_handler_fn exn;
#
fun wrap_base_mailop is_mailop_ready_to_fire ()
=
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } => MAILOP_IS_READY_TO_FIRE { priority, fire_mailop => wrap fire_mailop };
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch => MAILOP_IS_NOT_READY_TO_FIRE (wrap set_up_mailopready_watch);
esac;
#
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops);
#
wrap' (CHOOSE_MAILOP mailops) => CHOOSE_MAILOP (map wrap' mailops);
#
wrap' (DYNAMIC_MAILOP make_mailop) => DYNAMIC_MAILOP (fn () = make_exception_handling_mailop (make_mailop(), exception_handler_fn));
wrap' (DYNAMIC_MAILOP_WITH_NACK f) => DYNAMIC_MAILOP_WITH_NACK (fn mailop = make_exception_handling_mailop (f mailop, exception_handler_fn));
end;
end;
Prepared_Mailops(X)
= NACKFREE_MAILOPS List( Base_Mailop(X) )
| NACKFULL_MAILOPS List( Prepared_Mailops(X) )
| WITHNACK_MAILOP (itt::Condition_Variable, Prepared_Mailops(X))
;
/* +DEBUG
fun sayGroup (msg, eg) = let
fun f (NACKFREE_MAILOPS l, sl) = "NACKFREE_MAILOPS(" ! int::to_string (list::length l) ::()) ! sl
| f (NACKFULL_MAILOPS l, sl) = "NACKFULL_MAILOPS(" ! g (l, ")" ! sl)
| f (WITHNACK_MAILOP l, sl) = "WITHNACK_MAILOP(" ! f(#2 l, ")" ! sl)
also g ([], sl) = sl
| g ([x], sl) = f (x, sl)
| g (x ! r, sl) = f (x, ", " ! g (r, sl))
in
Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
end
-DEBUG*/
# Prepare mailop expression to run.
# In particular, this evaluates all
# dynamic rules to generate the actual
# final rules to do_one_mailop between:
#
fun prepare (BASE_MAILOPS l)
=>
NACKFREE_MAILOPS l;
prepare mailop
=>
prepare' mailop
where
fun prepare' (DYNAMIC_MAILOP make_mailop)
=>
prepare' (make_mailop ());
prepare' (DYNAMIC_MAILOP_WITH_NACK f)
=>
{ condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
#
WITHNACK_MAILOP (condvar, prepare' (f (wait_on_condvar' condvar)));
};
prepare' (BASE_MAILOPS mailops)
=>
NACKFREE_MAILOPS mailops;
prepare' (CHOOSE_MAILOP mailops)
=>
prepare_mailops (mailops, []) # Optimistically assume nackfree; we'll fall back toprepare_nackfull_mailops() routine if we're wrong.
where
fun prepare_mailops ([], mailops)
=>
NACKFREE_MAILOPS mailops;
prepare_mailops (mailop ! rest, mailops')
=>
case (prepare' mailop)
#
NACKFREE_MAILOPS mailops => prepare_mailops (rest, mailops @ mailops' );
NACKFULL_MAILOPS mailops => prepare_nackfull_mailops (rest, mailops @ [NACKFREE_MAILOPS mailops']);
/* */ mailops => prepare_nackfull_mailops (rest, [mailops, NACKFREE_MAILOPS mailops']);
esac;
end
also
fun prepare_nackfull_mailops ([], [group]) => group; # The general case vs prepare_mailops above handling the nice simple (and common) case.
prepare_nackfull_mailops ([], l) => NACKFULL_MAILOPS l;
#
prepare_nackfull_mailops (mailop ! rest, l)
=>
case (prepare' mailop, l)
#
(NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
=>
prepare_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');
(NACKFULL_MAILOPS mailops, l) => prepare_nackfull_mailops (rest, mailops @ l);
( mailops, l) => prepare_nackfull_mailops (rest, mailops ! l); # Here 'mailops' can be NACKFREE_MAILOPS or WITHNACK_MAILOP.
esac;
end; end; end; end;end;
stipulate
#
count = REF 0; # Runs circularly around range 0..999,999
#
fun pick_fairly i # The point here is just to pick fairly among 'i' alternative ready mailops,
= # so that we don't consistently pass over any one ready mailop.
{ j = *count;
#
if (j == 1000000) count := 0;
else count := j+1;
fi;
int::rem (j, i);
};
herein
#
fun pick_highest_priority_mailop_breaking_ties_fairly ([(_, fire_mailop)], _)
=>
fire_mailop; # Only one choice -- easy work!
pick_highest_priority_mailop_breaking_ties_fairly (l, n)
=>
max (l, 0, 0, [])
where
#
fun priority -1 => n;
priority p => p;
end;
#
fun max ((p, fire_mailop) ! rest, max_p, k, fire_mailop_fns)
=>
{ p = priority p;
#
if (p > max_p) max (rest, p, 1, [fire_mailop] ); # mailop is higher priority than any other yet seen -- discard other candidates.
elif (p == max_p) max (rest, max_p, k+1, fire_mailop ! fire_mailop_fns); # mailop is equal-highest priority yet encountered -- add it to list of candidates.
else max (rest, max_p, k, fire_mailop_fns); # mailop is low-priority, ignore it.
fi;
};
max ([], _, k, [fire_mailop])
=>
fire_mailop; # Scanned all mailops and we have a unique highest-priority mailop, so choose it.
max ([], _, k, fire_mailop_fns)
=>
list::nth (fire_mailop_fns, pick_fairly k); # Scanned all mailops and we have several highest-priority mailops, so break tie fairly.
end;
end;
end;
end;
#
fun make__do1mailoprun_status__and__finish_do1mailoprun ()
=
{ do1mailoprun_status = REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));
#
finish_do1mailoprun = .{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE; };
{ do1mailoprun_status, finish_do1mailoprun };
};
stipulate
# When we have exactly one mailop in the do_one_mailop[...] we can use simple logic:
#
fun do_one_mailop (is_mailop_ready_to_fire: Base_Mailop(X))
=
{
log::uninterruptible_scope_mutex := 1;
#
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_READY_TO_FIRE { fire_mailop, ... } # Reppy refers to 'fire_mailop' as 'doFn'.
=>
fire_mailop ();
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch
=>
{ (make__do1mailoprun_status__and__finish_do1mailoprun ())
->
{ do1mailoprun_status, finish_do1mailoprun };
#
set_up_mailopready_watch
{ do1mailoprun_status,
finish_do1mailoprun,
return_to__set_up_mailopready_watches__loop
=>
mps::dispatch_next_thread__usend__noreturn # Since we have only one mailop, we do not need to actually loop here.
};
};
esac;
};
Test_Mailops_For_Readiness_To_Fire__Result(X)
#
= NO_READY_MAILOPS { start_mailop_watch__fns: List (itt::Set_Up_Mailopready_Watch__Fn(X)) }
| READY_MAILOPS { fire_mailop_fns: List ((Int, (Void -> X))), n: Int }
;
herein
# This function handles the case of picking
# and firing one of a list of mailops thunks
# (w/o any negative acknowledgements).
#
# It also handles NEVER.
#
fun do_nackfree_mailops [] => mps::dispatch_next_thread__noreturn (); # 'do_one_mailop' with empty rule list -- nothing to do.
do_nackfree_mailops [mailop] => do_one_mailop mailop; # This is the only call to do_one_mailop().
#
do_nackfree_mailops mailops
=>
{
log::uninterruptible_scope_mutex := 1; # Start uninterruptible scope. (Aka "critical section", "atomic region" etc.)
#
case (test_mailops_for_readiness_to_fire (mailops, []))
#
READY_MAILOPS { fire_mailop_fns, n }
=>
{ fire_mailop = pick_highest_priority_mailop_breaking_ties_fairly (fire_mailop_fns, n); # Pick a do_one_mailop[....] mailop to fire ...
#
fire_mailop (); # ... and then fire it.
};
NO_READY_MAILOPS { start_mailop_watch__fns }
=>
call_with_current_control_fate
#
(fn fate
=
{ set_up_mailopready_watches__loop start_mailop_watch__fns;
#
error "[set_up_mailopready_watches__loop]"; # Above should never return.
}
where
switch_to_control_fate = switch_to_control_fate fate;
#
(make__do1mailoprun_status__and__finish_do1mailoprun ())
->
{ do1mailoprun_status, finish_do1mailoprun };
#
fun set_up_mailopready_watches__loop [] => mps::dispatch_next_thread__usend__noreturn ();
#
set_up_mailopready_watches__loop (set_up_mailopready_watch ! rest)
=>
switch_to_control_fate
#
(set_up_mailopready_watch
{ do1mailoprun_status,
finish_do1mailoprun,
return_to__set_up_mailopready_watches__loop # maildrop.pkg, mailslot.pkg etc call this to return control to us.
=>
fn () = set_up_mailopready_watches__loop rest
}
);
end;
end
);
esac;
}
where
#
fun test_mailops_for_readiness_to_fire (is_mailop_ready_to_fire ! rest, start_mailop_watch__fns) # In this loop we have not yet found any mailops ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch => test_mailops_for_readiness_to_fire (rest, set_up_mailopready_watch ! start_mailop_watch__fns);
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } => test_mailops_for_readiness_to_fire' (rest, [(priority, fire_mailop)], 1);
esac;
test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns) # Done -- no ready-to-fire mailops found and no candidates left to check.
=>
NO_READY_MAILOPS { start_mailop_watch__fns }; # No mailops were ready to fire; return list of fns which
end # each start a watch on one mailop for readiness to fire.
also
fun test_mailops_for_readiness_to_fire' (is_mailop_ready_to_fire ! rest, fire_mailop_fns, n) # In this loop we have found at least one mailop which is ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } => test_mailops_for_readiness_to_fire' (rest, (priority, fire_mailop) ! fire_mailop_fns, n+1);
_ => test_mailops_for_readiness_to_fire' (rest, fire_mailop_fns, n );
esac;
test_mailops_for_readiness_to_fire' ([], fire_mailop_fns, n)
=>
READY_MAILOPS { fire_mailop_fns, n }; # At least one mailop was ready to fire; return the fns which
# will fire the ready-to-fire mailops.
end; # fun do_ready_mailops
# NOTE: Maybe we should just keep
# track of the max priority above?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
end; # where
end; # fun do_nackfree_mailops
end; # stipulate
stipulate
# Walk the mailop group tree,
# collecting the base mailops
# (with associated ack flags),
# also a list of nackstates.
#
# A nackstate is a
# (Condvar, List(Flag(Ack)))
# pair, where the flags are
# those associated with the
# mailops covered by the nack
# condvar.
#
fun gather_base_mailops_and_nackstates mailops
=
case mailops
#
NACKFULL_MAILOPS _
=>
gather_mailops (mailops, [], [])
where
un_wrapped_flag = REF FALSE;
#
fun reverse_and_prepend_mailops (mailop ! rest, results) => reverse_and_prepend_mailops (rest, (mailop, un_wrapped_flag) ! results);
reverse_and_prepend_mailops ( [], results) => results;
end;
#
fun gather_mailops (NACKFREE_MAILOPS nackfree_mailops, bl, nackstates)
=>
(reverse_and_prepend_mailops (nackfree_mailops, bl), nackstates);
gather_mailops (NACKFULL_MAILOPS nackfull_mailops, bl, nackstates)
=>
fold_forward f (bl, nackstates) nackfull_mailops
where
fun f (group', (bl', nackstates'))
=
gather_mailops (group', bl', nackstates');
end;
gather_mailops (withnack as WITHNACK_MAILOP _, bl, nackstates)
=>
gather_wrapped (withnack, bl, nackstates);
end;
end;
group => gather_wrapped (group, [], []);
esac
where
fun gather_wrapped (group, bl, nackstates)
=
{ (gather (group, bl, [], nackstates))
->
(bl, _, nackstates);
(bl, nackstates);
}
where
fun gather (NACKFREE_MAILOPS mailops, bl, all_flags, nackstates)
=>
{ (reverse_and_prepend_mailops (mailops, bl, all_flags))
->
(bl', all_flags');
(bl', all_flags', nackstates);
}
where
fun reverse_and_prepend_mailops ([], bl, all_flags)
=>
(bl, all_flags);
reverse_and_prepend_mailops (mailop ! rest, bl, all_flags)
=>
{ flag = REF FALSE;
#
reverse_and_prepend_mailops (rest, (mailop, flag) ! bl, flag ! all_flags);
};
end;
end;
gather (NACKFULL_MAILOPS group, bl, all_flags, nackstates)
=>
fold_forward f (bl, all_flags, nackstates) group
where
fun f (group', (bl', all_flags', nackstates'))
=
gather (group', bl', all_flags', nackstates');
end;
gather (WITHNACK_MAILOP (condvar, group), bl, all_flags, nackstates)
=>
{ (gather (group, bl, [], nackstates))
->
(bl', all_flags', nackstates');
( bl',
all_flags' @ all_flags,
(condvar, all_flags') ! nackstates'
);
};
end; # fun gather
end; # where
end; # where
Test_Mailops_For_Readiness_To_Fire__Result(X)
#
= NO_READY_MAILOPS { start_mailop_watch__fns: List ((itt::Set_Up_Mailopready_Watch__Fn(X), Ref(Bool))) }
| READY_MAILOPS { fire_mailop_fns: List ((Int, ((Void -> X), Ref(Bool)))), n: Int }
;
herein
# This function handles the more
# complicated case of running a
# mailop expression where negative
# acknowledgements are involved.
#
fun do_nackfull_mailops group
=
{
log::uninterruptible_scope_mutex := 1; # Start uninterruptible scope (aka "critical section" aka "atomic region"...)
#
case (test_mailops_for_readiness_to_fire (mailops, []))
#
READY_MAILOPS { fire_mailop_fns, n }
=>
{ (pick_highest_priority_mailop_breaking_ties_fairly (fire_mailop_fns, n))
->
(fire_mailop, flag);
flag := TRUE;
send_nacks_as_appropriate ();
fire_mailop ();
};
NO_READY_MAILOPS { start_mailop_watch__fns }
=>
call_with_current_control_fate
(fn fate
=
{ set_up_mailopready_watches__loop start_mailop_watch__fns; # Ask each mailop in the do_one_mailop [...] list to enqueue itself as appropriate.
#
error "[set_up_mailopready_watches__loop]"; # This cannot happen -- set_up_mailopready_watches__loop() runs next thread when done.
}
where
switch_to_control_fate = switch_to_control_fate fate;
#
do1mailoprun_status = REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread ())); # 'do1mailoprun_status' is basically a mutex ensuring exactly one mailop fires per do1mailoprun.
#
fun finish_do1mailoprun_fn flag () # This will be called by the first mailop to fire.
=
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
flag := TRUE;
send_nacks_as_appropriate ();
};
#
fun set_up_mailopready_watches__loop [] => mps::dispatch_next_thread__usend__noreturn ();
#
set_up_mailopready_watches__loop ((set_up_mailopready_watch, flag) ! rest)
=>
switch_to_control_fate
(
set_up_mailopready_watch
{
do1mailoprun_status,
finish_do1mailoprun => finish_do1mailoprun_fn flag,
return_to__set_up_mailopready_watches__loop # maildrop.pkg, mailslot.pkg etc call this to return control to us.
=>
fn () = set_up_mailopready_watches__loop rest
}
);
end; # fn set_up_mailopready_watches__loop
end # where
);
esac;
}
where
(gather_base_mailops_and_nackstates group)
->
(mailops, nackstates);
#
fun send_nacks_as_appropriate ()
=
apply check_condvar nackstates # NB: We capture 'nackstates' here for later use in 'finish_do1mailoprun_fn'.
where
# check_condvar checks the flags of a nackstate.
# If they are all FALSE then the
# corresponding condvar is set to signal
# the negative ack.
#
fun check_condvar (condvar, flags)
=
check_flags flags
where
fun check_flags ((REF TRUE) ! _) => ();
check_flags (_ ! rest) => check_flags rest;
check_flags [] => set_condvar__iu condvar;
end;
end;
end;
#
fun test_mailops_for_readiness_to_fire ((is_mailop_ready_to_fire, flag) ! rest, start_mailop_watch__fns) # In this loop we have not yet found a mailop ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop } => test_mailops_for_readiness_to_fire' (rest, [(priority, (fire_mailop, flag))], 1);
MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch => test_mailops_for_readiness_to_fire (rest, (set_up_mailopready_watch, flag) ! start_mailop_watch__fns);
esac;
test_mailops_for_readiness_to_fire ([], start_mailop_watch__fns) # Done -- no ready-to-fire mailops found and none left to check,
=> # so we must block until some mailop becomes ready to fire.
NO_READY_MAILOPS { start_mailop_watch__fns };
end
also
fun test_mailops_for_readiness_to_fire' ((is_mailop_ready_to_fire, flag) ! rest, fire_mailop_fns, n) # In this loop we have found at least one do_one_mailop[...] mailop ready to fire.
=>
case (is_mailop_ready_to_fire ())
#
MAILOP_IS_READY_TO_FIRE { priority, fire_mailop }
=>
test_mailops_for_readiness_to_fire' (rest, (priority, (fire_mailop, flag)) ! fire_mailop_fns, n+1);
_ => test_mailops_for_readiness_to_fire' (rest, fire_mailop_fns, n);
esac;
test_mailops_for_readiness_to_fire' ([], fire_mailop_fns, n) # Done -- pick one do_one_mailop[...] mailop to fire and then fire it.
=>
READY_MAILOPS { fire_mailop_fns, n };
end;
# NOTE: Maybe above we should just
# keep track of the max priority?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
#
end; # fun do_nackfull_mailops
end; # stipulate
#
fun block_until_mailop_fires mailop # External entrypoint.
=
case (prepare mailop)
#
NACKFREE_MAILOPS mailops => do_nackfree_mailops mailops;
/* */ mailops => do_nackfull_mailops mailops;
esac;
# 'do_one_mailop' is our core entrypoint, the 'do_one_mailop' used by clients
# to do style handle-multiple-mail-sources thread I/O via
# statements looking like
#
# do_one_mailop [
# foo' ==> .{ do_this (); },
# bar' ==> .{ do_that (); },
# ...
# ];
#
# We have two main cases:
# 1) If one or more mailops in the list is ready to fire, we pick one and fire it.
# 2) If no mailop in the list is ready to fire, we must block until one becomes ready, then continue as in 1).
fun do_one_mailop mailops
=
case (prepare_mailops (mailops, []))
#
NACKFREE_MAILOPS mailops => do_nackfree_mailops mailops; # This is special-case handling for simple special case of no WITH_NACK mailops.
/* */ mailops => do_nackfull_mailops mailops; # This is the general case.
esac
where
# Preparation. During this phase we need to:
#
# o Expand DYNAMIC_MAILOP # These are essentially hacks to allow generating
# and DYNAMIC_MAILOP_WITH_NACK clauses. # mailops on the fly while 'do_one_mailop' is running.
#
# o Figure out whether we have any WITH_NACK clauses,
# which complicate things.
#
# In the common case of no WITH_NACK mailops
# we return NACKFREE_MAILOPS _, otherwise
# we return NACKFULL_MAILOPS _ or WITHNACK_MAILOP _.
#
fun prepare_mailops (mailop ! rest, mailops') # prepare_mailops handles the nice simple nack-free case;
=> # prepare_nackfull_mailops handles the case where one or more WITH_NACK clauses are present.
case (prepare_one_mailop mailop)
#
/* */ NACKFREE_MAILOPS mailops => prepare_mailops (rest, mailops @ mailops' );
/* */ NACKFULL_MAILOPS mailops => prepare_nackfull_mailops (rest, mailops @ [NACKFREE_MAILOPS mailops']);
wm as WITHNACK_MAILOP _ => prepare_nackfull_mailops (rest, [wm, NACKFREE_MAILOPS mailops']);
esac;
prepare_mailops ([], mailops) => NACKFREE_MAILOPS mailops; # Done!
end
also
fun prepare_nackfull_mailops ([], [result]) => result;
prepare_nackfull_mailops ([], results) => NACKFULL_MAILOPS results;
#
prepare_nackfull_mailops (mailop ! rest, results)
=>
case (prepare_one_mailop mailop, results)
#
(NACKFREE_MAILOPS mailops, NACKFREE_MAILOPS mailops' ! rest')
=>
prepare_nackfull_mailops (rest, NACKFREE_MAILOPS (mailops @ mailops') ! rest');
(NACKFULL_MAILOPS mailops, results) => prepare_nackfull_mailops (rest, mailops @ results);
(other, results) => prepare_nackfull_mailops (rest, other ! results); # 'other' can be (NACKFREE_MAILOPS _) or (WITHNACK_MAILOP _).
esac;
end
also
fun prepare_one_mailop (BASE_MAILOPS mailops) => NACKFREE_MAILOPS mailops;
prepare_one_mailop (CHOOSE_MAILOP mailops) => prepare_mailops (mailops, []);
prepare_one_mailop (DYNAMIC_MAILOP make_mailop) => prepare_one_mailop (make_mailop ()); # Generate a do_one_mailop[...] mailop on the fly, then recursively prepare it.
#
prepare_one_mailop (DYNAMIC_MAILOP_WITH_NACK make_mailop) # Like as DYNAMIC_MAILOP but with a nack mailop to signal client-code abort.
=>
{ condvar = itt::CONDITION_VARIABLE (REF (itt::CONDVAR_IS_NOT_SET []));
#
WITHNACK_MAILOP (condvar, prepare_one_mailop (make_mailop (wait_on_condvar' condvar)));
};
end;
end; # fun do_one_mailop
}; # package mailop
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.


