


## maildrop.pkg # Derives from cml/src/core-cml/sync-var.sml
#
# Maildrops are essentially concurrency-safe replacements for REF cells.
# Compiled by:
# src/lib/std/standard.lib### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package maildrop
: Maildrop # Maildrop is from src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api {
exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
#
Fate(X) = fat::Fate(X);
#
Maildrop(X) = MAILDROP
{
priority: Ref( Int ),
read_q: rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
value: Ref( Null_Or(X) )
};
fun maildrop_to_string (MAILDROP { priority => REF p, read_q, value }) # Debug support, primarily to textify mailslot state for logging via log::note or such.
=
{ q_len = list::length (rwq::to_list read_q);
sprintf "prio=%d len(in_q)=%d value is %s" p q_len case *value NULL => "EMPTY"; _ => "FULL"; esac;
};
fun make_cell ()
=
MAILDROP { priority => REF 0,
value => REF NULL,
read_q => rwq::make_rw_queue ()
};
fun same_cell ( MAILDROP { value => v1, ... },
MAILDROP { value => v2, ... }
)
=
v1 == v2;
fun make_do1mailoprun_status ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));
fun mark_do1mailoprun_complete_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_id;
};
mark_do1mailoprun_complete_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
fun bump_priority (p as REF n) # Bump a priority value by one, returning the old value.
=
{ p := n+1;
n;
};
Qy_Item(X)
#
= NO_ITEM
| ITEM ((Ref itt::Do1mailoprun_Status, Fate(X)))
;
# Functions to clean channel input and output queues
#
stipulate
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) # Drop any cancelled transactions at start of list.
=>
clean rest;
clean l => l;
end;
fun clean_rev ([], result) # Drop all cancelled transactions from list; result is in reverse order.
=>
result;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, result)
=>
clean_rev (rest, result);
clean_rev (x ! rest, result)
=>
clean_rev (rest, x ! result);
end;
herein
fun clean_and_check (priority, rwq::RW_QUEUE { front, back } )
=
clean_front *front
where
fun clean_front []
=>
clean_back *back;
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
f' => { front := f';
#
bump_priority priority;
};
esac;
end
also
fun clean_back []
=>
0;
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => 0;
rr => { front := rr;
#
bump_priority priority;
};
esac;
};
end;
end;
fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
(item ! rest) => { front := rest;
#
ITEM item;
};
esac;
end
also
fun clean_back [] => NO_ITEM;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
#
ITEM item;
};
esac;
};
end;
end;
fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
f' => { front := f';
#
back := item ! *back;
};
esac;
end
also
fun clean_back []
=>
front := [item];
clean_back r
=>
case (clean_rev (r, []))
#
[] => { front := [item]; back := []; };
rr => { back := [item]; front := rr; };
esac;
end;
end;
end; # stipulate
# When a thread is resumed after being blocked
# on a maildrop or oneshot_maildrop op there may
# be other threads also blocked on the variable.
#
# This function is used to propagate the message
# to all of the threads that are blocked on the
# variable (or until one of them takes the value
# in the mvar case).
#
# It must be called from an atomic region.
# When the readQ is finally empty we leave
# the atomic region.
#
# We must use "clean_and_remove" to get items
# from the readQ in the unlikely event that
# a single thread executes a choice of
# multiple gets on the same variable.
#
fun relay_msg (read_q, msg)
=
case (clean_and_remove read_q)
#
NO_ITEM => log::uninterruptible_scope_mutex := 0;
ITEM (do1mailoprun_status, fate)
=>
call_with_current_fate
(fn old_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
switch_to_fate fate msg; #
}
);
esac;
fun impossible ()
=
raise exception FAIL "maildrop: impossible";
# M-variables:
#
make_empty_maildrop
=
make_cell;
fun make_maildrop initial_value
=
MAILDROP { priority => REF 0,
read_q => rwq::make_rw_queue (),
value => REF (THE initial_value)
};
same_maildrop = same_cell;
fun put_in_maildrop (maildrop as MAILDROP { priority, read_q, value }, maildrop_value)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL =>
{ value := THE maildrop_value;
#
case (clean_and_remove read_q)
#
NO_ITEM => {
log::uninterruptible_scope_mutex := 0;
};
ITEM (do1mailoprun_status, fate)
=>
{
call_with_current_fate
#
(fn old_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
priority := 1;
switch_to_fate fate maildrop_value; # Resume empty' below.
}
);
};
esac;
};
THE _ =>
{
log::uninterruptible_scope_mutex := 0;
#
raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
};
esac;
};
fun take_from_maildrop' (MAILDROP { priority, read_q, value } )
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun set_up_mailopready_watch # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # This typically does do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and then sends nacks as appropriate.
return_to__set_up_mailopready_watches__loop # After starting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
# src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# set_up_mailopready_watch ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(fn fate
=
{ rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
#
return_to__set_up_mailopready_watches__loop (); # Return control to mailop.pkg
impossible (); # return_to__set_up_mailopready_watches__loop() should never return.
}
)
)
-> maildrop_value; # Execution will pick up here when fill() (above) eventually does: switch_to_fate fate maildrop_value;
finish_do1mailoprun (); # Do stuff like do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
value := NULL; # Empty the maildrop.
log::uninterruptible_scope_mutex := 0; # End uninterruptible scope.
maildrop_value; # Return value read from maildrop.
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'
=
case *value
#
NULL => itt::MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
#
THE v => itt::MAILOP_IS_READY_TO_FIRE
{
priority => bump_priority priority,
fire_mailop => .{ value := NULL; # Reppy refers to 'fire_mailop' as 'doFn'.
log::uninterruptible_scope_mutex := 0;
v;
}
};
esac;
end;
fun nonblocking_take_from_maildrop (MAILDROP { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => { value := NULL;
log::uninterruptible_scope_mutex := 0;
THE v;
};
NULL => NULL;
esac;
};
fun take_from_maildrop (maildrop as MAILDROP { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL =>
{
v = call_with_current_fate
(fn fate
=
{
rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
#
mps::dispatch_next_thread__usend__noreturn ();
}
);
value := NULL;
log::uninterruptible_scope_mutex := 0;
v;
};
THE v =>
{
value := NULL;
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
fun peek_in_maildrop (maildrop as MAILDROP { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => {
v = call_with_current_fate
(fn fate
=
{
rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
#
mps::dispatch_next_thread__usend__noreturn ();
}
);
relay_msg (read_q, v);
v;
};
THE v => {
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
fun peek_in_maildrop' (maildrop as MAILDROP { priority, read_q, value } )
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun set_up_mailopready_watch # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__set_up_mailopready_watches__loop # After starting a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
# src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# set_up_mailopready_watch ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(fn fate
=
{
rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
return_to__set_up_mailopready_watches__loop (); # Return control to mailop.pkg.
impossible (); # return_to__set_up_mailopready_watches__loop should never return.
}
)
)
-> v; # Execution will resume on this line when 'fate' is eventually called.
finish_do1mailoprun();
relay_msg (read_q, v);
v;
};
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *value
#
NULL => {
itt::MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
};
#
THE v => {
itt::MAILOP_IS_READY_TO_FIRE
{
priority => bump_priority priority,
#
fire_mailop => .{ log::uninterruptible_scope_mutex := 0; # Reppy refers to 'fire_mailop' as 'doFn'.
v;
}
};
};
esac;
end;
fun nonblocking_peek_in_maildrop (maildrop as MAILDROP { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => {
log::uninterruptible_scope_mutex := 0;
#
THE v;
};
NULL => {
log::uninterruptible_scope_mutex := 0;
NULL;
};
esac;
};
# Swap the current contents of the cell with a new value.
#
# This function has the effect of an
# get_mail followed by a put_mail,
# except that it is guaranteed to be atomic.
#
# It is also somewhat more efficient.
#
fun maildrop_swap (MAILDROP { priority, read_q, value }, new_v)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => { v = call_with_current_fate
(fn fate
=
{ rwq::put_on_back_of_queue (read_q, (make_do1mailoprun_status(), fate));
#
mps::dispatch_next_thread__usend__noreturn ();
}
);
value := THE new_v;
relay_msg (read_q, new_v); # Relay the new value to any other blocked threads.
v;
};
THE v => { value := THE new_v;
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
fun maildrop_swap' (MAILDROP { priority, read_q, value }, new_maildrop_value)
=
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]
where
fun set_up_mailopready_watch # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__set_up_mailopready_watches__loop # After starting a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
# src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# set_up_mailopready_watch ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ (call_with_current_fate
(fn fate
=
{ rwq::put_on_back_of_queue (read_q, (do1mailoprun_status, fate));
return_to__set_up_mailopready_watches__loop (); # Return control to mailop.pkg.
impossible(); # return_to__set_up_mailopready_watches__loop() should never return.
}
)
)
-> old_maildrop_value; # Execution picks up on this line when 'fate' eventually gets called.
finish_do1mailoprun ();
value := THE new_maildrop_value;
relay_msg (read_q, new_maildrop_value);
old_maildrop_value;
};
fun is_mailop_ready_to_fire ()
=
case *value
#
NULL => itt::MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
THE v => itt::MAILOP_IS_READY_TO_FIRE
{
priority => bump_priority priority,
#
fire_mailop => .{ value := THE new_maildrop_value; # Reppy refers to 'fire_mailop' as 'doFn'.
log::uninterruptible_scope_mutex := 0;
v;
}
};
esac;
end;
}; # package maildrop
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.


