


## oneshot-maildrop.pkg
#
# The implementation of Id-style synchronizing memory cells.
# Compiled by:
# src/lib/std/standard.lib### "We're fools whether we dance or not,
### so we might as well dance."
###
### -- Japanese proverb
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package rwq = rw_queue; # rw_queue is from src/lib/src/rw-queue.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package oneshot_maildrop
: Oneshot_Maildrop # Oneshot_Maildrop is from src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.api {
# We use the same underlying representation
# for both oneshots and maildrops:
#
Cell(X) = CELL { priority: Ref( Int ),
read_q: rwq::Rw_Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)) ),
value: Ref( Null_Or(X) )
};
Oneshot_Maildrop(X) = Cell(X);
exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
#
fun make_cell ()
=
CELL { priority => REF 0,
value => REF NULL,
read_q => rwq::make_rw_queue ()
};
#
fun same_cell (CELL { value=>v1, ... }, CELL { value=>v2, ... } )
=
v1 == v2;
#
fun make_transaction_id ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));
#
fun mark_do1mailoprun_complete_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_id))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_id;
};
mark_do1mailoprun_complete_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
# Bump a priority value by one,
# returning the old value:
#
fun bump_priority (p as REF n)
=
{ p := n+1;
n;
};
Qy_Item X
= NO_ITEM
| ITEM ((Ref(itt::Do1mailoprun_Status), Fate(X)) )
; # ITEM should probably host a record not a tuple. XXX SUCKO FIXME.
# Functions to clean channel input and output queues
#
stipulate
#
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest)
=>
clean rest;
clean l => l;
end;
#
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, l)
=>
clean_rev (rest, l);
clean_rev (x ! rest, l)
=>
clean_rev (rest, x ! l);
end;
herein
#
fun clean_and_check (priority, rwq::RW_QUEUE { front, back } )
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
f' => { front := f';
#
bump_priority priority;
};
esac;
end
also
fun clean_back [] => 0;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => 0;
rr => { front := rr;
#
bump_priority priority;
};
esac;
};
end;
end;
#
fun clean_and_remove (rwq::RW_QUEUE { front, back, ... } )
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
(item ! rest) => { front := rest;
#
ITEM item;
};
esac;
end
also
fun clean_back [] => NO_ITEM;
#
clean_back r
=>
{ back := [];
#
case (clean_rev (r, []))
#
[] => NO_ITEM;
item ! rest
=>
{ front := rest;
#
ITEM item;
};
esac;
};
end;
end;
#
fun clean_and_enqueue (rwq::RW_QUEUE { front, back, ... }, item)
=
clean_front *front
where
fun clean_front [] => clean_back *back;
#
clean_front f
=>
case (clean f)
#
[] => clean_back *back;
f' => { front := f';
#
back := item ! *back;
};
esac;
end
also
fun clean_back [] => front := [ item ];
#
clean_back r
=>
case (clean_rev (r, []))
#
[] => { front := [item]; back := []; };
rr => { back := [item]; front := rr; };
esac;
end;
end;
end; # stipulate
# When a thread is resumed after being blocked
# on an oneshot get() or maildrop empty() operation
# there may be other threads also blocked on the variable.
#
# This function is used to propagate the message
# to all of the threads that are blocked on the
# variable (or until one of them takes the value
# in the maildropr case).
#
# It must be called from an uninterruptible scope.
# When the read_q is finally empty we end
# the auninterruptible scope.
#
# We must use "clean_and_remove" to get items
# from the read_q in the unlikely event that
# a single thread executes a choice of
# multiple gets on the same variable.
#
fun relay_msg (read_q, msg)
=
case (clean_and_remove read_q)
#
NO_ITEM => log::uninterruptible_scope_mutex := 0;
ITEM (do1mailoprun_status, fate)
=>
call_with_current_fate
(fn old_fate
=
{ mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
switch_to_fate fate msg; #
}
);
esac;
# I-variables
#
make_oneshot_maildrop = make_cell;
same_oneshot_maildrop = same_cell;
#
fun put_in_oneshot (CELL { priority, read_q, value }, oneshot_value)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => {
value := THE oneshot_value;
#
case (clean_and_remove read_q)
#
NO_ITEM =>
{
log::uninterruptible_scope_mutex := 0;
};
#
ITEM (do1mailoprun_status, fate)
=>
call_with_current_fate
(
fn old_fate
=
{
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => mark_do1mailoprun_complete_and_return_thread do1mailoprun_status, old_fate };
#
priority := 1;
switch_to_fate fate oneshot_value; #
}
);
esac;
};
THE _ =>
{
log::uninterruptible_scope_mutex := 0;
#
raise exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
};
esac;
};
#
fun take_from_oneshot (CELL { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
NULL => { msg = call_with_current_fate
(fn fate
=
{ rwq::put_on_back_of_queue (read_q, (make_transaction_id(), fate));
#
mps::dispatch_next_thread__usend__noreturn ();
}
);
relay_msg (read_q, msg);
msg;
};
THE v => { log::uninterruptible_scope_mutex := 0;
v;
};
esac;
};
#
fun take_from_oneshot' (CELL { priority, read_q, value } )
=
itt::BASE_MAILOPS [is_mailop_ready_to_fire]
where
fun set_up_mailopready_watch # Reppy refers to 'set_up_mailopready_watch' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__set_up_mailopready_watches__loop # After setting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
# src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# set_up_mailopready_watch ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{
(call_with_current_fate
(fn fate
=
{ rwq::put_on_back_of_queue
( read_q,
(do1mailoprun_status, fate)
);
return_to__set_up_mailopready_watches__loop (); # Return control to mailop.pgk
raise exception FAIL "maildrop: impossible"; # return_to__set_up_mailopready_watches__loop() should never return.
}
)
)
-> oneshot_value; # Execution will pick up here when 'fate' is eventually called.
# This will happen when set() (above) eventually does: switch_to_fate fate oneshot_value;
finish_do1mailoprun (); # Keep any other mailops in the current select[...] from executing.
relay_msg (read_q, oneshot_value); # relay_msg() takes care of log::uninterruptible_scope_mutex := 0; for us.
oneshot_value;
};
#
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' as 'pollFn'.
=
case *value
#
NULL => itt::MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
#
THE v => itt::MAILOP_IS_READY_TO_FIRE
{
priority => bump_priority priority,
#
fire_mailop => .{ priority := 1; # Reppy refers to 'fire_mailop' as 'doFn'.
log::uninterruptible_scope_mutex := 0;
v;
}
};
esac;
end;
#
fun nonblocking_take_from_oneshot (CELL { priority, read_q, value } )
=
{
log::uninterruptible_scope_mutex := 1;
#
case *value
#
THE v => { log::uninterruptible_scope_mutex := 0;
#
THE v;
};
NULL => NULL;
esac;
};
}; # package maildrop1
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.


