


## mailqueue.pkg # Derives from cml/src/core-cml/mailbox.sml
#
# Unbounded queues of thread-to-thread mail messages.
# Compiled by:
# src/lib/std/standard.libstipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package mailqueue: (weak)
api {
include Mailqueue; # Mailqueue is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api #
reset_mailqueue: Mailqueue(X) -> Void;
}
{
Queue(X) = { front: List(X),
rear: List(X)
};
fun enqueue ( { front, rear }, x)
=
{ front,
rear => x ! rear
};
fun dequeue ( { front => x ! rest, rear } ) => ( { front=>rest, rear }, x);
dequeue ( { front => [], rear } ) => dequeue { front=>list::reverse rear, rear=> [] };
end;
# Remove item from queue and return it plus new queue.
# This will raise an exception if queue is empty, but
# caller guarantees that queue is nonempty.
Mailqueue_State(X)
= EMPTY Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)))
| NONEMPTY (Int, Queue(X)) # The 'Int' is a priority -- starts at 1, incremented by is_mailop_ready_to_fire ().
; # NB: The queue of the NONEMPTY constructor should never be empty -- use EMPTY instead.
Mailqueue(X) = MAILQUEUE Ref( Mailqueue_State(X) );
empty_queue = EMPTY { front => [],
rear => []
};
fun reset_mailqueue (MAILQUEUE state)
=
state := empty_queue;
fun make_mailqueue ()
=
MAILQUEUE (REF empty_queue);
fun same_mailqueue ( MAILQUEUE s1,
MAILQUEUE s2
)
=
s1 == s2; # Compare the refcells -- each refcell is equal only to itself, and different mailqueues will always have different refcells.
fun make__mailop_done__refcell ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_thread()));
fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_state))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_state;
};
end_do1mailoprun_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
Mailqueue_Item(X)
#
= NO_ITEM
#
| ITEM ( Ref( itt::Do1mailoprun_Status ), # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
Fate(X),
Mailqueue_State(X)
) # This really should be a record. XXX SUCKO FIXME.
;
fun mailqueue_length (mailqueue: Mailqueue(X))
=
{ mailqueue -> MAILQUEUE (REF state);
#
case state
#
EMPTY _ => 0;
NONEMPTY (_, { front, rear }) => { (list::length front) + (list::length rear); };
esac;
};
fun mailqueue_to_string (mailqueue: Mailqueue(X))
=
{ mailqueue -> MAILQUEUE (REF state);
#
case state
#
EMPTY { front, rear } => { f = (list::length front); r = (list::length rear); sprintf "EMPTY [%d|%d]=%d" f r (f+r); };
NONEMPTY (i, { front, rear }) => { f = (list::length front); r = (list::length rear); sprintf "NONEMPTY(%d,[%d|%d]=%d)" i f r (f+r); };
esac;
};
stipulate
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) => clean rest;
clean l => l;
end;
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, l)
=>
clean_rev (rest, l);
clean_rev (x ! rest, l)
=>
clean_rev (rest, x ! l);
end;
herein
fun clean_and_remove (q as { front, rear } )
=
clean_front front
where
fun clean_front []
=>
clean_rear rear;
clean_front f
=>
case (clean f)
#
[] => clean_rear rear;
((id, k) ! rest)
=>
ITEM (id, k, EMPTY { front=>rest, rear } );
esac;
end
also
fun clean_rear []
=>
NO_ITEM;
clean_rear r
=>
case (clean_rev (r, []))
#
[] => NO_ITEM;
(id, k) ! rest => ITEM (id, k, EMPTY { front=>rest, rear => [] } );
esac;
end;
end;
end;
fun put_in_mailqueue (MAILQUEUE qstate, x)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *qstate
#
EMPTY q
=>
case (clean_and_remove q)
#
NO_ITEM =>
{
qstate := NONEMPTY (1, { front => [x], rear => [] } );
#
log::uninterruptible_scope_mutex := 0;
};
ITEM (do1mailoprun_status, get_fate, qstate')
=>
call_with_current_fate
(fn old_fate
=
{ qstate := qstate';
#
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => end_do1mailoprun_and_return_thread do1mailoprun_status, old_fate };
switch_to_fate get_fate x; #
}
);
esac;
NONEMPTY (p, q)
=>
call_with_current_fate # We force a context switch here to prevent a producer from outrunning a consumer.
#
(fn fate
=
{ qstate := NONEMPTY (p, enqueue (q, x));
#
mps::yield_to_next_thread__xu fate;
}
);
esac;
};
fun get_msg__xu (qstate, q)
=
{
(dequeue q) -> (q', msg);
#
case q'
#
{ front => [],
rear => []
}
=> qstate := empty_queue;
_ => qstate := NONEMPTY (1, q');
esac;
log::uninterruptible_scope_mutex := 0;
msg;
};
fun take_from_mailqueue (mq as MAILQUEUE qstate)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *qstate
#
EMPTY q
=>
{ msg = call_with_current_fate
(
fn get_fate
=
{ qstate := EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
#
mps::dispatch_next_thread__usend__noreturn ();
}
);
log::uninterruptible_scope_mutex := 0;
msg;
};
NONEMPTY (priority, q)
=>
get_msg__xu (qstate, q);
esac;
};
fun take_from_mailqueue' (mq as MAILQUEUE qstate)
=
{
fun set_up_mailopready_watch
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__set_up_mailopready_watches__loop # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
# src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# set_up_mailopready_watch ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ q = case *qstate EMPTY q => q;
/* */ NONEMPTY _ => raise exception FAIL "Compiler bug: Unsupported NONEMPTY case in poll'/set_up_mailopready_watch"; # Should be impossible, since is_mailop_ready_to_fire() (below)
esac; # only queues us up if *qstate is EMPTY.
(call_with_current_fate
#
(fn get_fate
=
{ qstate := EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
#
return_to__set_up_mailopready_watches__loop (); # Return control to mailop.pkg.
raise exception FAIL "Mailqueue: impossible"; # return_to__set_up_mailopready_watches__loop() should never return.
}
)
)
-> msg; # Execution will pick up on this line when 'get_fate" is eventually called.
finish_do1mailoprun ();
log::uninterruptible_scope_mutex := 0;
msg;
};
fun is_mailop_ready_to_fire ()
=
case *qstate
#
EMPTY _ => itt::MAILOP_IS_NOT_READY_TO_FIRE set_up_mailopready_watch;
#
NONEMPTY (priority, q)
=>
{ qstate := NONEMPTY (priority+1, q);
#
itt::MAILOP_IS_READY_TO_FIRE
{
priority,
fire_mailop => .{
get_msg__xu (qstate, q);
}
};
};
esac;
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]; # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg };
fun nonblocking_take_from_mailqueue (MAILQUEUE qstate)
=
{
log::uninterruptible_scope_mutex := 1;
#
case *qstate
#
EMPTY q
=>
{ log::uninterruptible_scope_mutex := 0;
NULL;
};
NONEMPTY (priority, q)
=>
THE (get_msg__xu (qstate, q));
esac;
};
}; # package mailqueue
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2013,
## released per terms of SMLNJ-COPYRIGHT.


