## mailqueue.pkg # Derives from cml/src/core-cml/mailbox.sml
#
# Unbounded queues of thread-to-thread mail messages.
# Compiled by:
#
src/lib/std/standard.libstipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mic = microthread; # microthread is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg #
Fate(X) = fat::Fate(X);
#
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
package mailqueue: (weak)
api {
include api Mailqueue; # Mailqueue is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api #
reset_mailqueue: Mailqueue(X) -> Void;
}
{
Queue(X) = { front: List(X),
back: List(X)
};
Mailqueue_State(X)
= EMPTY Queue( (Ref( itt::Do1mailoprun_Status ), Fate(X)))
| NONEMPTY Queue(X)
# NB: The queue of the NONEMPTY constructor should never be empty -- use EMPTY instead.
;
Mailqueue(X) = MAILQUEUE { id: Int, # Because sooner or later in debugging we'll want a mapping from mailqueues to other data.
reader: mic::Microthread, # The microthread reading from the mailqueue. Useful for debugging and display purposes.
state: Ref( Mailqueue_State(X) ),
put_count: Ref( Int ), # Total number of times 'put' has been called on this mailqueue.
taps: Ref( List( (Ref(Void), X -> Void) ) ) # Debug taps to be called by put_in_mailqueue(). The Ref(Void) values merely identify list entries for deletion.
};
empty_queue = EMPTY { front => [],
back => []
};
fun enqueue ( { front, back }, x)
=
{ front,
back => x ! back
};
fun dequeue ( { front => x ! rest, back } ) => ( { front=>rest, back }, x);
dequeue ( { front => [], back } ) => dequeue { front=> reverse back, back=> [] };
end;
# Remove item from queue and return it plus new queue.
# This will raise an exception if queue is empty, but
# caller guarantees that queue is nonempty.
fun dequeue_all ( { front => x, back => [] } ) => x;
dequeue_all ( { front => [], back => y } ) => reverse y;
dequeue_all ( { front => x, back => y } ) => (x @ (reverse y));
end;
# Remove all items from queue and return as a list.
# This will raise an exception if queue is empty, but
# caller guarantees that queue is nonempty.
fun reset_mailqueue (MAILQUEUE { state, ... })
=
state := empty_queue;
stipulate
#
next_id = REF 1;
herein
fun issue_unique_id ()
=
{ result = *next_id;
next_id = result + 1; # Note that we cannot be pre-empted because body of fn contains no function calls.
result;
};
end;
fun make_mailqueue reader # 'reader' is the microthread which will be reading the mailqueue -- useful to know for debugging and display.
=
MAILQUEUE { reader, #
id => issue_unique_id (),
state => REF empty_queue,
put_count => REF 0, # Total number of times 'put' has been called on this mailqueue.
taps => REF []
};
fun same_mailqueue ( MAILQUEUE { id => id1, ... },
MAILQUEUE { id => id2, ... }
)
=
id1 == id2; #
fun make__mailop_done__refcell ()
=
REF (itt::DO1MAILOPRUN_IS_BLOCKED (mps::get_current_microthread()));
fun end_do1mailoprun_and_return_thread (do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread_state))
=>
{ do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
#
thread_state;
};
end_do1mailoprun_and_return_thread (REF (itt::DO1MAILOPRUN_IS_COMPLETE))
=>
raise exception DIE "Compiler bug: Attempt to cancel already-cancelled transaction"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
Mailqueue_Item(X)
#
= NO_ITEM
#
| ITEM ( Ref( itt::Do1mailoprun_Status ),
# 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
Fate(X),
Mailqueue_State(X)
) # This really should be a record. XXX SUCKO FIXME.
;
fun get_mailqueue_id (MAILQUEUE { id, ... })
=
id;
fun get_mailqueue_reader (MAILQUEUE { reader, ... })
=
reader;
fun get_mailqueue_putcount (MAILQUEUE { put_count, ... })
=
*put_count;
fun get_mailqueue_length (mailqueue: Mailqueue(X))
=
{ mailqueue -> MAILQUEUE { state => REF state, ... };
#
case state
#
EMPTY _ => 0;
NONEMPTY { front, back } => { (length front) + (length back); };
esac;
};
fun mailqueue_to_string (MAILQUEUE { state => (REF state), ... }, name) # Debug support, primarily to textify mailqueue state for logging via log::note or such.
=
sprintf "{MQ:%s %s }" name (sprint_state state)
where
fun sprint_thread thread
=
{ thread -> itt::MICROTHREAD { thread_id, task, ... };
task -> itt::APPTASK { task_name, task_id, ... };
#
sprintf "%d:%d" thread_id task_id;
};
#
fun sprint_readqueue q
=
string::join " " (map sprint_q_entry q)
where
fun sprint_q_entry (REF (itt::DO1MAILOPRUN_IS_COMPLETE), _)
=>
"*";
sprint_q_entry (REF (itt::DO1MAILOPRUN_IS_BLOCKED microthread), _)
=>
sprint_thread microthread;
end;
end;
fun sprint_state state
=
case state
#
EMPTY { front, back } => { sprintf "EMPTY [%s
|%s]" (sprint_readqueue front) (sprint_readqueue back); };
NONEMPTY { front, back } => { f = (length front); r = (length back); sprintf "NONEMPTY [%d
|%d]=%d)" f r (f+r); };
esac;
end;
stipulate
fun clean ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest) => clean rest;
clean l => l;
end;
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::DO1MAILOPRUN_IS_COMPLETE, _) ! rest, l)
=>
clean_rev (rest, l);
clean_rev (x ! rest, l)
=>
clean_rev (rest, x ! l);
end;
herein
fun clean_and_remove (q as { front, back } )
=
clean_front front
where
fun clean_front []
=>
clean_back back;
clean_front f
=>
case (clean f)
#
[] => clean_back back;
((id, k) ! rest)
=>
ITEM (id, k, EMPTY { front=>rest, back } );
esac;
end
also
fun clean_back []
=>
NO_ITEM;
clean_back r
=>
case (clean_rev (r, []))
#
[] => NO_ITEM;
(id, k) ! rest => ITEM (id, k, EMPTY { front=>rest, back => [] } );
esac;
end;
end;
end;
fun put_in_mailqueue (MAILQUEUE { state => qstate, put_count, taps, ... }, x)
=
{
mps::assert_not_in_uninterruptible_scope "put_in_mailqueue";
log::uninterruptible_scope_mutex := 1;
#
put_count := *put_count + 1;
#
apply' *taps (\\ (id, tap) = tap(x));
#
case *qstate
#
EMPTY q
=>
case (clean_and_remove q)
#
NO_ITEM =>
{
qstate := NONEMPTY { front => [x], back => [] };
#
log::uninterruptible_scope_mutex := 0;
};
ITEM (do1mailoprun_status, get_fate, qstate')
=>
call_with_current_fate
(\\ old_fate
=
{ qstate := qstate';
#
mps::enqueue_old_thread_plus_old_fate_then_install_new_thread { new_thread => end_do1mailoprun_and_return_thread do1mailoprun_status, old_fate };
switch_to_fate get_fate x; #
}
);
esac;
NONEMPTY q
=>
call_with_current_fate # We force a context switch here to reduce the risk of a producer outrunning its consumer.
# # XXX SUCKO FIXME I think it would be better to context-switch only when we have >=16 things in queue.
(\\ fate # This would allow batching of xdraw commands for efficiency and improve cache locality of codetraces.
=
{ qstate := NONEMPTY (enqueue (q, x));
#
mps::yield_to_next_thread__xu fate;
}
);
esac;
};
fun get_msg__xu (qstate, q)
=
{
(dequeue q) -> (q', msg);
#
case q'
#
{ front => [],
back => []
}
=> qstate := empty_queue;
_ => qstate := NONEMPTY q';
esac;
log::uninterruptible_scope_mutex := 0;
msg;
};
fun get_msgs__xu (qstate, q)
=
{
msgs = dequeue_all q;
#
qstate := empty_queue;
log::uninterruptible_scope_mutex := 0;
msgs;
};
fun take_from_mailqueue (mq as MAILQUEUE { state => qstate, ... })
=
{
mps::assert_not_in_uninterruptible_scope "take_from_mailqueue";
log::uninterruptible_scope_mutex := 1;
#
case *qstate
#
EMPTY q
=>
{ msg = call_with_current_fate
(
\\ get_fate
=
{ qstate := EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
#
mps::dispatch_next_thread__xu__noreturn ();
}
);
log::uninterruptible_scope_mutex := 0;
msg;
};
NONEMPTY q
=>
get_msg__xu (qstate, q);
esac;
};
fun take_all_from_mailqueue (mq as MAILQUEUE { state => qstate, ... })
=
{
mps::assert_not_in_uninterruptible_scope "take_all_from_mailqueue";
log::uninterruptible_scope_mutex := 1;
#
case *qstate
#
EMPTY q
=>
{ msg = call_with_current_fate
(
\\ get_fate
=
{ qstate := EMPTY (enqueue (q, (make__mailop_done__refcell(), get_fate)));
#
mps::dispatch_next_thread__xu__noreturn ();
}
);
log::uninterruptible_scope_mutex := 0;
[ msg ];
};
NONEMPTY q
=>
get_msgs__xu (qstate, q);
esac;
};
fun take_from_mailqueue' (mq as MAILQUEUE { state => qstate, ... })
=
{
fun suspend_then_eventually_fire_mailop
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
{ q = case *qstate EMPTY q => q;
/* */ NONEMPTY _ => raise exception DIE "Unsupported NONEMPTY case in suspend_then_eventually_fire_mailop"; # Should be impossible, since is_mailop_ready_to_fire() (below)
esac; # only queues us up if *qstate is EMPTY.
(call_with_current_fate
#
(\\ get_fate
=
{ qstate := EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
raise exception DIE "Mailqueue: impossible"; # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> msg; # Execution will pick up on this line when 'get_fate" is eventually called.
finish_do1mailoprun ();
log::uninterruptible_scope_mutex := 0;
msg;
};
fun is_mailop_ready_to_fire ()
=
case *qstate
#
EMPTY _ => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
NONEMPTY q
=>
{ qstate := NONEMPTY q;
#
itt::READY_MAILOP { fire_mailop => {. get_msg__xu (qstate, q); } };
};
esac;
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]; # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg };
fun take_all_from_mailqueue' (mq as MAILQUEUE { state => qstate, ... })
=
{
fun suspend_then_eventually_fire_mailop
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After starting up a mailop-ready-to-fire watch, we call this to return control to mailop.pkg.
}
=
{ q = case *qstate EMPTY q => q;
/* */ NONEMPTY _ => raise exception DIE "Unsupported NONEMPTY case in /suspend_then_eventually_fire_mailop"; # Should be impossible, since is_mailop_ready_to_fire() (below)
esac; # only queues us up if *qstate is EMPTY.
(call_with_current_fate
#
(\\ get_fate
=
{ qstate := EMPTY (enqueue (q, (do1mailoprun_status, get_fate)));
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
raise exception DIE "Mailqueue: impossible"; # return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> msg; # Execution will pick up on this line when 'get_fate" is eventually called.
finish_do1mailoprun ();
log::uninterruptible_scope_mutex := 0;
[ msg ];
};
fun is_mailop_ready_to_fire ()
=
case *qstate
#
EMPTY _ => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
#
NONEMPTY q
=>
{ qstate := NONEMPTY q;
#
itt::READY_MAILOP { fire_mailop => {. get_msgs__xu (qstate, q); } };
};
esac;
itt::BASE_MAILOPS [ is_mailop_ready_to_fire ]; # Recall that in essence a base mailop *is* an is_mailop_ready_to_fire -- see
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg };
fun note_mailqueue_tap (MAILQUEUE { taps, ... }, tap)
=
{ id = REF ();
taps := (id, tap) ! *taps;
id;
};
fun drop_mailqueue_tap (MAILQUEUE { taps, ... }, id_to_drop)
=
taps := list::remove
(\\ (id, tap) = id == id_to_drop)
*taps;
}; # package mailqueue
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.