## mailslot.pkg # Derives from cml/src/core-cml/channel.sml
#
# Mailslots implement synchronous message sends -- both
# threads must be ready before either can proceed.
#
# Despite being the most basic mail exchange mechanism,
# mailslots have the most intricate implementation,
# precisely because they involve a two-thread RENDEZVOUS
# rather than just two threads doing different things
# at different times.
#
# The rendezvous aspect means that the implementation
# is crucially about two threads doing something
# "at the same time". (By the time you really understand
# this package, you will have a very good grasp of using
# call_with_current_fate ("call/cc") in a practical setting!)
#
# In practice, in our current timeslicing implementation,
# we never have two microthreads running at the same time,
# so what has to actually happen is that the first thread
# to arrive at the mailslot blocks and gets put on one of
# its wait queues; when the second thread arrives the
# message transfer takes place and both threads are then
# free to proceed. In practice one will exit actually running,
# while the other will be moved from the mailslot wait queue
# to the microthread_preemptive_scheduler run queue.
#
#
# "To ensure that we always leave the atomic region exactly once, we
# require that the blocking operation be responsible for leaving the
# atomic region (in the mailop case, it must also execute the clean-up
# action). The fire_mailop fn always transfers control to the blocked thread
# without leaving the atomic region. Note that the give (and give')
# suspend_then_eventually_fire_mailop()s run using the receiver's thread state."
# -- John H Reppy
# Compiled by:
#
src/lib/std/standard.lib### "Two men enter -- one man leaves."
###
### -- Max Max: Beyond Thunderdome
stipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package mop = mailop; # mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package rwq = rw_queue; # rw_queue is from
src/lib/src/rw-queue.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package mailslot: (weak) api {
Mailop(X);
#
include api Mailslot; # Mailslot is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.api reset_mailslot: Mailslot(X) -> Void;
}
{
Mailop(X) = mop::Mailop(X);
# Some inline functions to improve performance
#
fun enqueue (rwq::RW_QUEUE { back, ... }, x)
=
back := x ! *back;
Mailslot(X)
=
MAILSLOT
{ in_q: rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
out_q: rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate( (itt::Microthread, Fate(X)) )) )
}; # The above two should probably be records not tuples. XXX SUCKO FIXME.
fun mailslot_to_string (MAILSLOT { in_q, out_q }) # Debug support, primarily to textify mailslot state for logging via log::note or such.
=
{ i_len = list::length (rwq::to_list in_q);
o_len = list::length (rwq::to_list out_q);
sprintf "len(in_q)=%d len(out_q)=%d" i_len o_len;
};
fun reset_mailslot (MAILSLOT { in_q, out_q } )
=
{ rwq::clear_queue_to_empty in_q;
rwq::clear_queue_to_empty out_q;
};
fun make_mailslot ()
=
MAILSLOT
{ in_q => rwq::make_rw_queue (),
out_q => rwq::make_rw_queue ()
};
fun same_mailslot # (Mailslot(X), Mailslot(X)) -> Bool
( MAILSLOT { in_q=>in1, ... },
MAILSLOT { in_q=>in2, ... }
)
=
rwq::same_queue (in1, in2);
fun make__mailop_done__refcell ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()) );
fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_id;
};
end_do1mailoprun_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception DIE "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
# Given a do1mailoprun_status refcell
# set the current thread
# to its thread state and
# mark it complete:
#
fun set_current_microthread do1mailoprun_status
=
mps::set_current_microthread (end_do1mailoprun_and_return_thread do1mailoprun_status);
Queue_Item(X)
= NO_ITEM
| ITEM (Ref(itt::Do1mailoprun_Status), Fate(X))
; # ITEM should probably host a record not a tuple. XXX SUCKO FIXME.
# Functions to clean slot input and output queues
#
stipulate
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
clean rest;
clean l => l;
end;
fun clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, l)
=>
clean_rev (rest, l);
clean_rev (x ! rest, l)
=>
clean_rev (rest, x ! l);
clean_rev ([], l)
=>
l;
end;
fun clean_all l
=
reverse (clean_rev (l, []), [])
where
fun reverse (x ! r, l)
=>
reverse (r, x ! l);
reverse ([], l)
=>
l;
end;
end;
herein
fun clean_and_check (rwq::RW_QUEUE { front, back } )
=
clean_front *front
where
fun clean_front []
=>
clean_back *back;
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
#
f' => { front := f';
TRUE;
};
esac;
end
also
fun clean_back [] => FALSE;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => FALSE;
#
rr => { front := rr;
TRUE;
};
esac;
};
end;
end;
fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
=
clean_front *front
where
fun clean_front []
=>
clean_back *back;
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
(item ! rest) => { front := rest;
#
ITEM item;
};
esac;
end
also
fun clean_back [] => NO_ITEM;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
ITEM item;
};
esac;
};
end;
end;
fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
=
case (clean_all *front)
#
[] => { front := clean_rev(*back, [item]); back := []; };
f => { front := f; back := item ! clean_all *back; };
esac;
end; # stipulate
fun impossible ()
=
raise exception DIE "Slot: impossible";
fun put_in_mailslot (mailslot as MAILSLOT { in_q, out_q }, msg) # Derives from Reppy's send()
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "put_in_mailslot";
log::uninterruptible_scope_mutex := 1;
#
case (clean_and_remove in_q)
#
ITEM (do1mailoprun_status, take_fateq)
=>
call_with_current_fate
(\\ give_fatep = { mps::enqueue_old_thread_plus_old_fate_then_install_new_thread # 'give' thread doesn't need to block, just yield to waiting 'take' thread.
{
new_thread => end_do1mailoprun_and_return_thread do1mailoprun_status,
old_fate => give_fatep
};
#
switch_to_fate take_fateq msg; # take_fateq_revival_point. This will canonically jump to take_fateq_resumption_point below.
}
);
NO_ITEM
=>
{
(call_with_current_fate
(\\ give_fateq # give_fateq will canonically be revived at give_fateq_revival_point below.
=
{ enqueue (out_q, (make__mailop_done__refcell(), give_fateq)); # refcell saves give_thread.
#
mps::dispatch_next_thread__xu__noreturn ();
}
)
) -> (take_thread, take_fatep); # give_fateq_resumption_point: When above give_fateq is eventually invoked we'll wind up here.
mps::switch_to_thread__xu
(take_thread, take_fatep, msg); # take_fatep_revival_point
};
esac;
};
fun put_in_mailslot' (MAILSLOT { in_q, out_q }, msg) # Derives from Reppy's sendEvt().
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun fire_mailop () # Reppy refers to 'fire_mailop' as 'doFn'.
=
{ (the (rwq::take_from_front_of_queue in_q))
->
(do1mailoprun_status, take_fate);
call_with_current_fate
(\\ give_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => end_do1mailoprun_and_return_thread do1mailoprun_status, old_fate => give_fate };
#
switch_to_fate take_fate msg; #
}
);
};
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(\\ give_fate
=
{ clean_and_enqueue (out_q, (do1mailoprun_status, give_fate));
return_to__suspend_then_eventually_fire_mailops__loop(); # Return control to mailop.pkg.
impossible (); # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> (take_thread, take_fate); # Execution will pick up on this line when 'give_fate' is eventually called.
finish_do1mailoprun();
mps::switch_to_thread__xu (take_thread, take_fate, msg);
};
fun is_mailop_ready_to_fire ()
=
case (clean_and_check in_q)
#
FALSE => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
TRUE => itt::READY_MAILOP { fire_mailop };
esac;
end;
fun nonblocking_put_in_mailslot (MAILSLOT { in_q, out_q }, msg) # Derives from Reppy's sendPoll().
=
call_with_current_fate
(
\\ give_fate # Is this 'give_fate' really simply discarded?
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "nonblocking_put_in_mailslot";
log::uninterruptible_scope_mutex := 1;
#
case (clean_and_remove in_q)
#
ITEM (rid, take_fate)
=>
{ call_with_current_fate
(
\\ give_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread
{
new_thread => end_do1mailoprun_and_return_thread rid,
old_fate => give_fate
};
#
switch_to_fate take_fate msg; # NB: switch_to_fate never returns.
}
);
TRUE;
};
NO_ITEM
=>
{ log::uninterruptible_scope_mutex := 0;
#
FALSE;
};
esac;
}
);
fun take_from_mailslot (mailslot as MAILSLOT { in_q, out_q } ) # Derives from Reppy's recv().
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "take_from_mailslot";
log::uninterruptible_scope_mutex := 1;
#
case (clean_and_remove out_q)
#
ITEM (do1mailoprun_status, give_fateq)
=>
{
take_thread = mps::get_current_microthread ();
#
set_current_microthread do1mailoprun_status; # Make 'give' thread the current thread.
(call_with_current_fate
(
\\ take_fatep
=
switch_to_fate give_fateq (take_thread, take_fatep) # give_fateq_revival_point: this is canonically a jump to give_fateq_resumption_point above.
# The fate passed here must NOT exit UNINTERRUPTIBLE MODE because take_thread_revival_point
# invokes it via mps::switch_to_thread__xu.
)
) -> result; # take_fatep_resumption_point: take_fatep above will resume here once revived.
result;
};
NO_ITEM
=>
{
(call_with_current_fate
(
\\ take_fateq
=
{ do1mailoprun_status = make__mailop_done__refcell();
#
enqueue (in_q, (do1mailoprun_status, take_fateq)); # take_fateq will canonically be revived at take_fateq_revival_point above.
# The fate enqueued here needs to EXIT UNINTERRUPTIBLE MODE
#
mps::dispatch_next_thread__xu__noreturn ();
}
)
) -> result; # take_fateq_resumption_point: take_fateq above will resume here once revived.
log::uninterruptible_scope_mutex := 0;
result;
};
esac;
};
fun take_from_mailslot' (MAILSLOT { in_q, out_q } ) # Derives from Reppy's recvEvt().
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun fire_mailop () # Reppy refers to 'fire_mailop' as 'doFn'.
=
{ (rwq::take_from_front_of_queue_or_raise_exception out_q)
->
(do1mailoprun_status, give_fate);
my_id = mps::get_current_microthread ();
set_current_microthread do1mailoprun_status;
call_with_current_fate
#
(\\ take_fate
=
switch_to_fate give_fate (my_id, take_fate) #
);
};
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ (call_with_current_fate
(
\\ take_fate
=
{ clean_and_enqueue (in_q, (do1mailoprun_status, take_fate));
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
impossible(); # return_to__suspend_then_eventually_fire_mailops__loop () should never return.
}
)
)
-> msg; # Execution will pick up on this line when 'fate' is eventually called.
finish_do1mailoprun ();
log::uninterruptible_scope_mutex := 0;
msg;
};
fun is_mailop_ready_to_fire ()
=
case (clean_and_check out_q)
#
FALSE => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
TRUE => itt::READY_MAILOP { fire_mailop };
esac;
end;
fun nonblocking_take_from_mailslot (MAILSLOT { in_q, out_q } ) # Derives from Reppy's recvPoll(0.
=
{
microthread_preemptive_scheduler::assert_not_in_uninterruptible_scope "nonblocking_take_from_mailslot";
log::uninterruptible_scope_mutex := 1;
#
case (clean_and_remove out_q)
#
ITEM (do1mailoprun_status, give_fate)
=>
THE (call_with_current_fate
(\\ take_fate
=
{ my_id = mps::get_current_microthread ();
#
set_current_microthread do1mailoprun_status;
switch_to_fate give_fate (my_id, take_fate); #
}
) );
NO_ITEM
=>
{ log::uninterruptible_scope_mutex := 0;
#
NULL;
};
esac;
};
};
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.