## threadkit-io-manager.pkg
#
# See comments in
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-io-manager.api# Compiled by:
#
src/lib/std/standard.libstipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package tim = time; # time is from
src/lib/std/time.pkg package ts = thread_scheduler; # thread_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg package wnx = winix; # winix is from
src/lib/std/winix.pkg package wio = winix::io; # winix_io is from
src/lib/std/src/posix/winix-io.pkgherein
package threadkit_io_manager
: (weak) Threadkit_Io_Manager # Threadkit_Io_Manager is from
src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-io-manager.api {
# Wait_Request def in
src/lib/src/lib/thread-kit/src/winix/threadkit-winix-io.api Io_Descriptor = wio::Io_Descriptor;
Wait_Request = wio::Wait_Request;
Wait_Result = wio::Wait_Result;
Io_Wait_Item
=
{ wait_request: Wait_Request,
transaction_id: Ref( itt::Transaction_Id ),
#
clean_up: Void -> Void,
fate: fat::Fate( Wait_Result )
};
waiting = REF ([]: List( Io_Wait_Item )); # Icky thread-hostile mutable global state...? XXX SUCKO FIXME
# In some OSs (e.g., Linux)
# this may raise an EINTR error,
# even though it is non-blocking.
#
fun poll wait_requests
=
wio::wait_for_io_opportunity { wait_requests, timeout => THE tim::zero_time }
except
_ = [];
# NOTE: As in the case of condition variables -- see
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg # -- we need to do the clean_up routine when we enable the
# io_mailop (instead of in the wait_for fate).
#
fun io_mailop wait_request
=
itt::BASE_MAILOPS [is_ready]
where
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
pi
where
pi = fat::call_with_current_fate
(
fn fate
=
{ item = { wait_request, transaction_id, clean_up, fate };
waiting := item ! *waiting;
next ();
raise exception FAIL "impossible: io_mailop";
}
);
end;
fun is_ready () # Reppy calls this fn pollFn
=
case (poll [wait_request])
#
[pi] => itt::MAILOP_READY
{ priority => -1,
do_it => .{ ts::reenable_thread_switching (); pi; } # Reppy calls this field doFn
};
_ => itt::MAILOP_UNREADY wait_for;
esac;
end;
fun same_descriptor (pi, wait_request)
=
pi == wait_request;
# Take an I/O waiting queue and return
# the cleaned queue along with the list
# of wait_requests in it.
#
# 'Cleaning' consists of dropping cancelled
# transactions from the wait queue:
#
fun clean wait_queue
=
clean' (wait_queue, [], [])
where
fun clean' ([] : List( Io_Wait_Item ), wait_requests, q)
=>
(wait_requests, q); # Done.
clean' ( { transaction_id => REF itt::CANCELLED_TRANSACTION_ID, ... } ! rest, wait_requests, wait_queue)
=>
clean' (rest, wait_requests, wait_queue); # Ignore cancelled transaction.
clean' ((item as { wait_request, ... } ) ! rest, wait_requests, wait_queue)
=>
clean' # Pass everything else through.
( rest,
wait_request ! wait_requests,
item ! wait_queue
);
end;
end;
# Enqueue a thread that is polling on the ready queue.
#
# We have to do some fate hacking to pass the
# poll info to the thread.
#
# We also must catch the case where the transaction
# has been canceled, since a single thread might be
# polling on multiple descriptors.
#
fun enqueue
( { transaction_id as REF (itt::TRANSACTION_ID id),
clean_up,
fate,
wait_request
},
pi
)
=>
{ ufate = fat::call_with_current_fate
(
fn kfate
=
{ fat::call_with_current_fate
(
fn ufate
=
fat::resume_fate kfate ufate
);
fat::resume_fate fate pi;
}
);
transaction_id := itt::CANCELLED_TRANSACTION_ID;
clean_up ();
ts::enqueue_thread (id, ufate);
};
enqueue ( { transaction_id => REF itt::CANCELLED_TRANSACTION_ID, ... }, _)
=>
();
end;
fun poll_io ()
=
case (clean *waiting)
#
([], _)
=>
waiting := [];
(wait_requests, wait_queue)
=>
case (poll wait_requests)
#
[] => waiting := list::reverse wait_queue;
#
l => filter (l, wait_queue, [])
where
fun filter ([], r, wait_queue)
=>
waiting
:=
list::reverse_and_prepend
(r, wait_queue);
filter
( pi ! pis,
(item: Io_Wait_Item) ! items,
wait_queue
)
=>
if (same_descriptor (pi, item.wait_request))
#
enqueue (item, pi);
filter (pis, items, wait_queue);
else
filter (pi ! pis, items, item ! wait_queue);
fi;
filter _ => raise exception FAIL "Compiler bug: Unsupported case in poll_io/filter.";
end;
end;
esac;
esac;
fun any_waiting ()
=
case *waiting
#
[] => FALSE;
_ => TRUE;
esac;
};
end;