PreviousUpNext

15.4.912  src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-io-manager.pkg

## threadkit-io-manager.pkg
#
# See comments in   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-io-manager.api

# Compiled by:
#     src/lib/std/standard.lib


stipulate
    package fat =  fate;                                # fate                          is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;            # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package tim =  time;                                # time                          is from   src/lib/std/time.pkg
    package ts  =  thread_scheduler;                    # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
    package wnx =  winix;                               # winix                         is from   src/lib/std/winix.pkg
    package wio =  winix::io;                           # winix_io                      is from   src/lib/std/src/posix/winix-io.pkg
herein

    package  threadkit_io_manager
    : (weak) Threadkit_Io_Manager                       # Threadkit_Io_Manager          is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-io-manager.api
    {

                                                        # Wait_Request                  def in    src/lib/src/lib/thread-kit/src/winix/threadkit-winix-io.api

        Io_Descriptor =  wio::Io_Descriptor;
        Wait_Request  =  wio::Wait_Request;
        Wait_Result   =  wio::Wait_Result;

        Io_Wait_Item
            =
            { wait_request:     Wait_Request,
              transaction_id:   Ref( itt::Transaction_Id ),
              #
              clean_up:         Void -> Void,
              fate:             fat::Fate( Wait_Result )
            };

        waiting =   REF ([]: List( Io_Wait_Item ));                                                     # Icky thread-hostile mutable global state...? XXX SUCKO FIXME

        # In some OSs (e.g., Linux)
        # this may raise an EINTR error,
        # even though it is non-blocking.
        #
        fun poll wait_requests
            =
            wio::wait_for_io_opportunity  { wait_requests,  timeout => THE tim::zero_time }
            except
                _ = [];

        # NOTE:  As in the case of condition variables -- see 
        #     src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg
        # -- we need to do the clean_up routine when we enable the
        # io_mailop (instead of in the wait_for fate).
        #
        fun io_mailop  wait_request
            =
            itt::BASE_MAILOPS [is_ready]
            where

                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    pi
                    where
                        pi  =   fat::call_with_current_fate
                                    (
                                     fn fate
                                        =
                                        {   item = { wait_request, transaction_id, clean_up, fate };

                                            waiting := item ! *waiting;
                                            next ();
                                            raise exception FAIL "impossible: io_mailop";
                                        }
                                    );
                    end;

                fun is_ready ()                                                                 # Reppy calls this fn pollFn
                    =
                    case (poll [wait_request])
                        #
                        [pi] =>  itt::MAILOP_READY
                                   { priority  =>  -1,
                                     do_it     =>  .{  ts::reenable_thread_switching ();   pi;  }       # Reppy calls this field doFn
                                   };

                        _    =>  itt::MAILOP_UNREADY wait_for;
                    esac;
            end;


        fun same_descriptor (pi, wait_request)
            =
            pi == wait_request;


        # Take an I/O waiting queue and return
        # the cleaned queue along with the list
        # of wait_requests in it.
        #
        # 'Cleaning' consists of dropping cancelled
        # transactions from the wait queue:     
        #
        fun clean wait_queue
            =
            clean' (wait_queue, [], [])
            where
                fun clean' ([] : List( Io_Wait_Item ), wait_requests, q)
                        =>
                        (wait_requests, q);                                             # Done.

                    clean' ( { transaction_id => REF itt::CANCELLED_TRANSACTION_ID, ... } ! rest, wait_requests, wait_queue)
                        =>
                        clean' (rest, wait_requests, wait_queue);                       # Ignore cancelled transaction.

                    clean' ((item as { wait_request, ... } ) ! rest, wait_requests, wait_queue)
                        =>
                        clean'                                                  # Pass everything else through.
                          ( rest,
                            wait_request ! wait_requests,
                            item ! wait_queue
                          );
                end;
            end;


        # Enqueue a thread that is polling on the ready queue.
        #
        # We have to do some fate hacking to pass the
        # poll info to the thread.
        #
        # We also must catch the case where the transaction
        # has been canceled, since a single thread might be
        # polling on multiple descriptors.
        #
        fun enqueue
                ( { transaction_id as REF (itt::TRANSACTION_ID id),
                    clean_up,
                    fate,
                    wait_request
                  },
                  pi
                )
                =>
                {   ufate = fat::call_with_current_fate
                                (
                                 fn kfate
                                    =
                                    {   fat::call_with_current_fate
                                            (
                                             fn ufate
                                                =
                                                fat::resume_fate  kfate  ufate
                                            );

                                        fat::resume_fate  fate  pi;
                                    }
                                );

                    transaction_id :=  itt::CANCELLED_TRANSACTION_ID;

                    clean_up ();

                    ts::enqueue_thread (id, ufate);
                };

            enqueue ( { transaction_id => REF itt::CANCELLED_TRANSACTION_ID, ... }, _)
                =>
                ();
        end;


        fun poll_io ()
            =
            case (clean *waiting)
                #         
                ([], _)
                    =>
                    waiting := [];

                (wait_requests, wait_queue)
                    =>
                    case (poll wait_requests)
                        #
                        [] =>  waiting :=  list::reverse  wait_queue;
                        #
                        l  => filter (l, wait_queue, [])
                              where
                                  fun filter ([], r, wait_queue)
                                          =>
                                          waiting
                                              :=
                                              list::reverse_and_prepend
                                                  (r, wait_queue);

                                      filter
                                          ( pi ! pis,
                                            (item:  Io_Wait_Item) ! items,
                                            wait_queue
                                          )
                                          =>
                                          if (same_descriptor (pi, item.wait_request))
                                              #
                                              enqueue (item, pi);
                                              filter (pis, items, wait_queue);
                                          else
                                              filter (pi ! pis,  items,  item ! wait_queue);
                                          fi;

                                      filter _ =>   raise exception FAIL "Compiler bug: Unsupported case in poll_io/filter.";
                                  end;
                              end;
                    esac;

            esac;


        fun any_waiting ()
            =
            case *waiting
                #
                [] => FALSE;
                 _ => TRUE;
            esac;

    };
end;



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext