## mailcaster.pkg
# Compiled by:
#
src/lib/std/standard.lib# Asynchronous multicast (one-to-many) mail distribution.
# This implementation is based on a condition variable
# implementation of multicast channels.
# See Chapter 5 of "Concurrent Programming in ML" for details.
### "The horns came riding in like the
### rainbow masts of silver ships."
###
### -- Peter S. Beagle, "The Last Unicorn"
stipulate
package md1 = oneshot_maildrop; # oneshot_maildrop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.pkg package md = maildrop; # maildrop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.pkg package ms = mailslot; # mailslot is from
src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.pkg package th = microthread; # microthread is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg (==>) = mailop::(==>);
put_in_mailslot = ms::put_in_mailslot;
take_from_mailslot = ms::take_from_mailslot;
take_from_mailslot' = ms::take_from_mailslot';
herein
package mailcaster
: (weak) Mailcaster # Mailcaster is from
src/lib/src/lib/thread-kit/src/lib/mailcaster.api {
Mailop(X)
=
mailop::Mailop(X);
Mailcaster(X)
=
MAILCASTER
( ms::Mailslot( Request( X ) ),
ms::Mailslot( Readqueue(X) )
)
also
Readqueue(X)
=
READQUEUE
( ms::Mailslot ((X, md1::Oneshot_Maildrop( Mc_State(X)))),
md::Maildrop (md1::Oneshot_Maildrop( Mc_State(X)))
)
also
Request(X)
= MESSAGE(X)
| NEW_QUEUE
also
Mc_State(X)
=
MCSTATE( (X, md1::Oneshot_Maildrop( Mc_State(X))) );
# Internal make-readqueue function.
# This is not externally visible:
#
fun internal_make_readqueue reply_1shot
=
{ out_ch = ms::make_mailslot ();
state_maildrop = md::make_full_maildrop reply_1shot;
fun tee reply_1shot
=
{ (md1::get_from_oneshot reply_1shot)
->
(MCSTATE (v, next_cv));
put_in_mailslot (out_ch, (v, next_cv));
tee next_cv;
};
th::make_thread
#
"mailcaster internal_make_readqueue"
#
{. tee reply_1shot; };
READQUEUE (out_ch, state_maildrop);
};
fun make_mailcaster ()
=
{ plea_slot = ms::make_mailslot ();
reply_slot = ms::make_mailslot ();
fun server cv
=
case (take_from_mailslot plea_slot)
#
NEW_QUEUE => { put_in_mailslot (reply_slot, internal_make_readqueue cv);
server cv;
};
MESSAGE m => { next_cv = md1::make_oneshot_maildrop ();
#
md1::put_in_oneshot (cv, MCSTATE (m, next_cv));
server next_cv;
};
esac;
th::make_thread "mailcaster" {. server (md1::make_oneshot_maildrop ()); };
MAILCASTER (plea_slot, reply_slot);
};
fun make_readqueue (MAILCASTER (plea_slot, reply_slot))
=
{ put_in_mailslot (plea_slot, NEW_QUEUE);
take_from_mailslot reply_slot;
};
fun transmit (MAILCASTER (slot, _), m)
=
put_in_mailslot (slot, MESSAGE m);
fun clone_readqueue (READQUEUE(_, state_maildrop))
=
internal_make_readqueue (md::get_from_maildrop state_maildrop);
fun get_msg state_maildrop (v, next_cv)
=
{ md::maildrop_swap (state_maildrop, next_cv);
v;
};
fun receive (READQUEUE (slot, state_maildrop))
=
get_msg state_maildrop (take_from_mailslot slot);
fun receive' (READQUEUE (slot, state_maildrop))
=
take_from_mailslot' slot
==>
get_msg state_maildrop;
}; # package mailcaster
end;