## oneshot-maildrop.pkg
#
# The implementation set-once maildrops.
# Compiled by:
#
src/lib/std/standard.lib### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from
src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package oneshot_maildrop
: Oneshot_Maildrop # Oneshot_Maildrop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.api {
# We use the same underlying representation
# for both oneshots and maildrops:
#
Oneshot_Maildrop(X)
=
ONESHOT { read_q: rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
value: Ref( Null_Or(X) )
};
exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
#
fun make_cell ()
=
ONESHOT { value => REF NULL,
read_q => rwq::make_rw_queue ()
};
#
fun same_cell (ONESHOT { value=>v1, ... }, ONESHOT { value=>v2, ... } )
=
v1 == v2;
#
fun make_transaction_id ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));
#
fun mark_do1mailoprun_complete_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_id;
};
mark_do1mailoprun_complete_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception DIE "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
Qy_Item(X)
= NO_ITEM
| ITEM ((Ref(itt::Do1mailoprun_Status), Fate(X)) )
; # ITEM should probably host a record not a tuple. XXX SUCKO FIXME.
# Functions to clean channel input and output queues
#
stipulate
#
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
clean rest;
clean l => l;
end;
#
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, l)
=>
clean_rev (rest, l);
clean_rev (x ! rest, l)
=>
clean_rev (rest, x ! l);
end;
herein
#
#
fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
(item ! rest) => { front := rest;
#
ITEM item;
};
esac;
end
also
fun clean_back [] => NO_ITEM;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
#
ITEM item;
};
esac;
};
end;
end;
#
fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
f' => { front := f';
#
back := item ! *back;
};
esac;
end
also
fun clean_back [] => front := [ item ];
#
clean_back r
=>
case (clean_rev (r, []))
#
[] => { front := [item]; back := []; };
rr => { back := [item]; front := rr; };
esac;
end;
end;
end; # stipulate
# When a thread is resumed after being blocked
# on an oneshot get() op there may be other threads
# also blocked on the oneshot.
#
# This function is used to propagate the message
# to all of the threads that are blocked on the
# variable.
#
# It must be called from an uninterruptible scope.
# When the read_q is finally empty we end
# the uninterruptible scope.
#
# We must use "clean_and_remove" to get items
# from the read_q in the unlikely event that
# a single thread executes a choice of
# multiple gets on the same oneshot.
#
fun wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, v)
=
case (clean_and_remove read_q)
#
NO_ITEM => log::uninterruptible_scope_mutex := 0;
ITEM (do1mailoprun_status, get_v)
=>
call_with_current_fate
(\\ old_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
switch_to_fate get_v v; #
}
);
esac;
# I-variables
#
make_oneshot_maildrop = make_cell;
same_oneshot_maildrop = same_cell;
#
fun put_in_oneshot (ONESHOT { read_q, value }, v)
=
{
mps::assert_not_in_uninterruptible_scope "put_in_oneshot";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => {
value := THE v;
#
case (clean_and_remove read_q)
#
NO_ITEM =>
{
log::uninterruptible_scope_mutex := 0;
};
#
ITEM (do1mailoprun_status, get_v)
=>
call_with_current_fate
(
\\ old_fate
=
{
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
switch_to_fate get_v v; #
}
);
esac;
};
THE _ =>
{
log::uninterruptible_scope_mutex := 0;
#
raise exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
};
esac;
};
#
fun get_from_oneshot (ONESHOT { read_q, value } )
=
{
mps::assert_not_in_uninterruptible_scope "get_from_oneshot";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => {
msg = call_with_current_fate
(\\ fate
=
{
rwq::put_on_back_of_queue (read_q, (make_transaction_id(), fate));
#
mps::dispatch_next_thread__xu__noreturn ();
}
);
wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, msg);
msg;
};
THE v => {
log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
#
fun get_from_oneshot' (ONESHOT { read_q, value } )
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After setting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(\\ get_v
=
{ rwq::put_on_back_of_queue
( read_q,
(do1mailoprun_status, get_v)
);
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pgk
raise exception DIE "maildrop: impossible"; # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> v; # Execution will pick up here when 'get_v(v)' is eventually called.
# This will happen when set() (above) eventually does: switch_to_fate get_v v;
finish_do1mailoprun (); # Keep any other mailops in the current select[...] from executing.
wake_remaining_microthreads_waiting_to_read_oneshot__xu (read_q, v); #
v;
};
#
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *value
#
NULL => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
THE v => itt::READY_MAILOP
{
fire_mailop => {. log::uninterruptible_scope_mutex := 0; # Reppy refers to 'fire_mailop' as 'doFn'.
v;
}
};
esac;
end;
#
fun nonblocking_get_from_oneshot (ONESHOT { read_q, value } )
=
{
mps::assert_not_in_uninterruptible_scope "nonblocking_get_from_oneshot";
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => { log::uninterruptible_scope_mutex := 0;
#
THE v;
};
NULL => NULL;
esac;
};
}; # package maildrop1
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.