## maildrop.pkg # Derives from cml/src/core-cml/sync-var.sml
#
# Maildrops are essentially concurrency-safe replacements for REF cells:
#
# o Attempting to read from an empty maildrop will block the reading
# microthread until some other thread deposits a value in the maildrop.
#
# o 'take' ops remove the value from a maildrop, leaving it empty.
#
# o 'peek' ops read the value from a maildrop while leaving it unchanged.
#
# o The value stored in a maildrop may be removed, replaced or swapped.
# Compiled by:
#
src/lib/std/standard.lib### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from
src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package maildrop
: Maildrop # Maildrop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api {
exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
#
Fate(X) = fat::Fate(X);
#
Maildrop(X) = MAILDROP { read_q: rwq::Rw_Queue( (Ref(itt::Do1mailoprun_Status), Fate(X)) ),
value: Ref( Null_Or(X) )
};
fun maildrop_to_string (MAILDROP { read_q, value }, name) # Debug support, primarily to textify mailslot state for logging via log::note or such.
=
{ sprintf "{MD:%s %s [%s] }"
#
name
#
case *value NULL => "EMPTY";
_ => "FULL";
esac
#
(sprint_readqueue read_q);
}
where
fun sprint_thread thread
=
{ thread -> itt::MICROTHREAD { thread_id, task, ... };
task -> itt::APPTASK { task_name, task_id, ... };
#
sprintf "%d:%d" thread_id task_id;
};
#
fun sprint_readqueue q
=
{
(string::join " " (map sprint_q_entry (rwq::frontq q))) + "
|" +
(string::join " " (map sprint_q_entry (rwq::backq q)));
}
where
fun sprint_q_entry (REF (itt::DO1MAILOPRUN_IS_COMPLETE), _)
=>
"*";
sprint_q_entry (REF (itt::DO1MAILOPRUN_IS_BLOCKED microthread), _)
=>
sprint_thread microthread;
end;
end;
end;
fun same_cell ( MAILDROP { value => v1, ... },
MAILDROP { value => v2, ... }
)
=
v1 == v2;
fun make_do1mailoprun_status ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));
fun mark_do1mailoprun_complete_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED microthread_id))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
microthread_id;
};
mark_do1mailoprun_complete_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception DIE "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
Qy_Item(X)
#
= NO_ITEM
| ITEM ((Ref itt::Do1mailoprun_Status, Fate(X)))
;
# Functions to clean channel input and output queues
#
stipulate
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) # Drop any cancelled transactions at start of list.
=>
clean rest;
clean l => l; # Stop at first non-COMPLETE entry in list -- otherwise
end; # clean_queue_and_remove_one_entry will be O(N**2) instead of O(N).
fun clean_rev ([], result) # Drop all cancelled transactions from list; result is in reverse order.
=>
result;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, result) # Drop cancelled transaction.
=>
clean_rev (rest, result);
clean_rev (x ! rest, result)
=>
clean_rev (rest, x ! result);
end;
herein
fun clean_queue_and_remove_one_entry (rwq::RW_QUEUE { front, back, ... } ) # Drop cancelled transactions and return first non-cancelled one.
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
(item ! rest) => { front := rest;
#
ITEM item;
};
esac;
end
also
fun clean_back [] => NO_ITEM;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
#
ITEM item;
};
esac;
};
end;
end;
end; # stipulate
# When a microthread is resumed after being blocked
# on a maildrop 'get' or 'take' op there may
# be other threads also blocked on the maildrop.
#
# This function is used to propagate the message
# to the threads that are blocked on the maildrop.
#
# It must be called from an uninterruptible scope.
# When the read_q is finally empty we exit the
# uninterruptible scope.
#
# We must use "clean_queue_and_remove_one_entry"
# to get items from the read_q to cover the unlikely
# case that a single microthread executes a choice of
# multiple gets on the same maildrop.
# # Called from get_from_maildrop and get_from_maildrop'.
fun wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v) # 'v' is the value being stored into the maildrop.
=
case (clean_queue_and_remove_one_entry read_q)
#
NO_ITEM => {
log::uninterruptible_scope_mutex := 0;
};
ITEM (do1mailoprun_status, get_v)
=>
call_with_current_fate
(\\ old_fate
=
{ new_thread = mark_do1mailoprun_complete_and_return_thread do1mailoprun_status;
#
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread, old_fate };
#
switch_to_fate get_v v; # In essence do get_v(v) and continue interrupted 'get' or 'take' op.
# The fn we return to will call us right back to continue processing read_q
} # unless it is a 'take', which leaves no value to be read.
);
esac;
fun impossible ()
=
raise exception DIE "maildrop: impossible";
fun make_empty_maildrop () # Id calls these "m-variables". (Thought you'd want to know.)
=
MAILDROP { value => REF NULL,
read_q => rwq::make_rw_queue ()
};
fun make_full_maildrop initial_value
=
MAILDROP { read_q => rwq::make_rw_queue (),
value => REF (THE initial_value)
};
same_maildrop = same_cell;
fun put_in_maildrop (maildrop as MAILDROP { read_q, value }, v) # 'v' is value being stored into the maildrop.
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "put_in_maildrop";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL =>
{
value := THE v;
#
case (clean_queue_and_remove_one_entry read_q)
#
NO_ITEM => log::uninterruptible_scope_mutex := 0;
ITEM (do1mailoprun_status, get_v)
=>
call_with_current_fate
#
(\\ old_fate
=
{ new_thread = mark_do1mailoprun_complete_and_return_thread
do1mailoprun_status;
#
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread
{ new_thread, old_fate };
switch_to_fate get_v v; # Do the get_v(v) call that some get_* or take_* is waiting for.
}
);
esac;
};
THE _ =>
{ log::uninterruptible_scope_mutex := 0;
#
raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
};
esac;
};
fun take_from_maildrop' (MAILDROP { read_q, value } )
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # This typically does do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and then sends nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(\\ fate
=
{ rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg
impossible (); # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> v; # Execution will pick up here when fill() (above) eventually does: switch_to_fate get_v v;
finish_do1mailoprun (); # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.
value := NULL; # Empty the maildrop.
log::uninterruptible_scope_mutex := 0; # End uninterruptible scope.
v; # Return value read from maildrop.
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'
=
case *value
#
NULL => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
THE v => itt::READY_MAILOP
{
fire_mailop => {. value := NULL; # Reppy refers to 'fire_mailop' as 'doFn'.
log::uninterruptible_scope_mutex := 0;
v;
}
};
esac;
end;
fun nonblocking_take_from_maildrop (MAILDROP { read_q, value } )
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "nonblocking_take_from_maildrop";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => { value := NULL;
log::uninterruptible_scope_mutex := 0;
THE v;
};
NULL => NULL;
esac;
};
fun take_from_maildrop (maildrop as MAILDROP { read_q, value } ) # Destructive read.
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "take_from_maildrop";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => {
value := NULL;
log::uninterruptible_scope_mutex := 0;
v;
};
NULL => {
(call_with_current_fate
(\\ get_v = { rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
#
mps::dispatch_next_thread__xu__noreturn ();
}
)) -> v; # When get_v(v) finally gets called, the 'v' winds up here.
value := NULL;
# Note that we do not call wake_remaining_microthreads_waiting_to_read_maildrop__xu()
# here because there is no value left in maildrop for them to read.
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
fun get_from_maildrop (maildrop as MAILDROP { read_q, value } ) # Pure read.
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "get_from_maildrop";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => {
log::uninterruptible_scope_mutex := 0;
v;
};
NULL => {
(call_with_current_fate
(\\ get_v = { rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
#
mps::dispatch_next_thread__xu__noreturn ();
}
)) -> v; # When get_v(v) finally gets called, the 'v' winds up here.
wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v); #
v;
};
esac;
};
fun get_from_maildrop' (maildrop as MAILDROP { read_q, value } )
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(\\ get_v
=
{ rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, get_v));
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
impossible (); # return_to__suspend_then_eventually_fire_mailops__loop should never return.
}
)
)
-> v; # Execution will resume on this line when 'get_v(v)' is eventually done.
finish_do1mailoprun(); # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.
wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, v); #
v;
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *value
#
NULL => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
THE v => {
itt::READY_MAILOP
{
fire_mailop => {. log::uninterruptible_scope_mutex := 0; # Reppy refers to 'fire_mailop' as 'doFn'.
v;
}
};
};
esac;
end;
fun nonblocking_get_from_maildrop (maildrop as MAILDROP { value, ... } )
=
*value;
# Swap the current contents of the cell with a new value.
#
# This function has the effect of an
# get_mail followed by a put_mail,
# except that it is guaranteed to be atomic.
#
# It is also somewhat more efficient.
#
fun maildrop_swap (MAILDROP { read_q, value }, new_v)
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "maildrop_swap";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => { v = call_with_current_fate
(\\ get_v
=
{ rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), get_v));
#
mps::dispatch_next_thread__xu__noreturn ();
}
);
value := THE new_v;
wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, new_v); #
v;
};
THE v => { value := THE new_v;
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
fun maildrop_swap' (MAILDROP { read_q, value }, new_maildrop_value)
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ (call_with_current_fate
(\\ fate
=
{ rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
impossible(); # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> v; # Execution picks up on this line when 'get_v(v)' eventually gets called.
finish_do1mailoprun (); # Remember that this do_one_mailop[] call has fired a mailop -- no other mailop in call is eligible to fire.
value := THE new_maildrop_value;
wake_remaining_microthreads_waiting_to_read_maildrop__xu (read_q, new_maildrop_value); #
v;
};
fun is_mailop_ready_to_fire ()
=
case *value
#
NULL => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
THE v => itt::READY_MAILOP
{
fire_mailop => {. value := THE new_maildrop_value; # Reppy refers to 'fire_mailop' as 'doFn'.
log::uninterruptible_scope_mutex := 0;
v;
}
};
esac;
end;
# Convenience fn for starting up imp graphs:
#
fun make_run_gun ()
=
{ run_gun = make_empty_maildrop (): Maildrop(Void);
run_gun' = get_from_maildrop' run_gun; # Build a mailop that will fire when fire_run_gun() is called, to use as a startup barrier.
fun fire_run_gun () = put_in_maildrop (run_gun, ());
{ run_gun', fire_run_gun };
};
# Same as above for shutdown:
#
fun make_end_gun ()
=
{ end_gun = make_empty_maildrop (): Maildrop(Void);
end_gun' = get_from_maildrop' end_gun; # Build a mailop that will fire when fire_run_gun() is called, to use as a startup barrier.
fun fire_end_gun () = put_in_maildrop (end_gun, ());
{ end_gun', fire_end_gun };
};
}; # package maildrop
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.