PreviousUpNext

15.4.941  src/lib/src/lib/thread-kit/src/lib/mailcaster.pkg

## mailcaster.pkg

# Compiled by:
#     src/lib/std/standard.lib

# Asynchronous multicast (one-to-many) mail distribution.
# This implementation is based on a condition variable
# implementation of multicast channels.
# See Chapter 5 of "Concurrent Programming in ML" for details.



###                 "The horns came riding in like the
###                  rainbow masts of silver ships."
###
###                          -- Peter S. Beagle, "The Last Unicorn"



stipulate
    package md1 = oneshot_maildrop;                                     # oneshot_maildrop      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.pkg
    package md  = maildrop;                                             # maildrop              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.pkg
    package ms  = mailslot;                                             # mailslot              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailslot.pkg
    package th  = microthread;                                          # microthread           is from   src/lib/src/lib/thread-kit/src/core-thread-kit/microthread.pkg

    (==>) =  mailop::(==>);

    put_in_mailslot  =  ms::put_in_mailslot;
    take_from_mailslot  =  ms::take_from_mailslot;
    take_from_mailslot' =  ms::take_from_mailslot';
herein

    package   mailcaster
    : (weak)  Mailcaster                                                # Mailcaster            is from   src/lib/src/lib/thread-kit/src/lib/mailcaster.api
    {


        Mailop(X)
            =
            mailop::Mailop(X);

        Mailcaster(X)
            =
            MAILCASTER
              ( ms::Mailslot( Request(   X ) ),
                ms::Mailslot( Readqueue(X) )
              )

        also
        Readqueue(X)
          =
          READQUEUE
            ( ms::Mailslot ((X, md1::Oneshot_Maildrop( Mc_State(X)))),
              md::Maildrop (md1::Oneshot_Maildrop( Mc_State(X)))
            )

        also
        Request(X)
          = MESSAGE(X)
          | NEW_QUEUE

        also
        Mc_State(X)
            =
            MCSTATE( (X, md1::Oneshot_Maildrop( Mc_State(X))) );

        # Internal make-readqueue function.
        # This is not externally visible:
        #
        fun internal_make_readqueue reply_1shot
            =
            {   out_ch         =   ms::make_mailslot ();
                state_maildrop =   md::make_full_maildrop reply_1shot;

                fun tee reply_1shot
                    =
                    {   (md1::get_from_oneshot  reply_1shot)
                            ->
                            (MCSTATE (v, next_cv));

                        put_in_mailslot (out_ch, (v, next_cv));
                        tee next_cv;
                    };

                  th::make_thread
                      #
                      "mailcaster internal_make_readqueue"
                      #
                      {. tee reply_1shot; };

                  READQUEUE (out_ch, state_maildrop);
            };

        fun make_mailcaster ()
            =
            {   plea_slot =  ms::make_mailslot ();
                reply_slot   =  ms::make_mailslot ();

                fun server cv
                    =
                    case (take_from_mailslot plea_slot)
                        #
                        NEW_QUEUE =>    {   put_in_mailslot (reply_slot, internal_make_readqueue  cv);
                                            server cv;
                                        };

                        MESSAGE m =>    {   next_cv = md1::make_oneshot_maildrop ();
                                            #
                                            md1::put_in_oneshot (cv, MCSTATE (m, next_cv));
                                            server next_cv;
                                        };
                    esac;

                  th::make_thread  "mailcaster"  {. server (md1::make_oneshot_maildrop ()); };

                  MAILCASTER (plea_slot, reply_slot);
              };

        fun make_readqueue (MAILCASTER (plea_slot, reply_slot))
            =
            {   put_in_mailslot (plea_slot, NEW_QUEUE);
                take_from_mailslot reply_slot;
            };

        fun transmit (MAILCASTER (slot, _), m)
            =
            put_in_mailslot (slot, MESSAGE m);

        fun clone_readqueue (READQUEUE(_, state_maildrop))
            =
            internal_make_readqueue (md::get_from_maildrop state_maildrop);

        fun get_msg state_maildrop (v, next_cv)
            =
            {   md::maildrop_swap (state_maildrop, next_cv);
                v;
            };

        fun receive (READQUEUE (slot, state_maildrop))
            =
            get_msg state_maildrop (take_from_mailslot slot);

        fun receive' (READQUEUE (slot, state_maildrop))
            =
            take_from_mailslot'  slot
                ==>
                get_msg  state_maildrop;

    };                                          # package mailcaster
end;


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext