## io-now-possible-mailop.pkg
#
# See comments in
src/lib/src/lib/thread-kit/src/core-thread-kit/io-now-possible-mailop.api# Compiled by:
#
src/lib/std/standard.libstipulate
package fat = fate; # fate is from
src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from
src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package tim = time; # time is from
src/lib/std/time.pkg package mps = microthread_preemptive_scheduler; # microthread_preemptive_scheduler is from
src/lib/src/lib/thread-kit/src/core-thread-kit/microthread-preemptive-scheduler.pkg package wnx = winix__premicrothread; # winix__premicrothread is from
src/lib/std/winix--premicrothread.pkg package wio = winix__premicrothread::io; # winix_io__premicrothread is from
src/lib/std/src/posix/winix-io--premicrothread.pkg #
call_with_current_fate = fat::call_with_current_fate;
switch_to_fate = fat::switch_to_fate;
herein
# This package gets referenced in:
#
#
src/lib/std/src/threadkit/posix/winix-io.pkg #
src/lib/std/src/socket/proto-socket.pkg #
src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg #
src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg #
package io_now_possible_mailop
: (weak) Io_Now_Possible_Mailop # Io_Now_Possible_Mailop is from
src/lib/src/lib/thread-kit/src/core-thread-kit/io-now-possible-mailop.api {
Ioplea_Item
=
{ ioplea: wio::Ioplea, # Ioplea = { io_descriptor: Iod, readable: Bool, writable: Bool, oobdable: Bool };
do1mailoprun_status: Ref( itt::Do1mailoprun_Status ), # Do1mailoprun_Status = DO1MAILOPRUN_IS_COMPLETE
| DO1MAILOPRUN_IS_BLOCKED Thread; do1mailoprun_status is a mutex enforcing the one-mailop-fires-per-select() rule.
#
finish_do1mailoprun: Void -> Void, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
fate: fat::Fate( wio::Ioplea_Result )
};
waiting_queue__local = REF ([]: List( Ioplea_Item )); # Icky thread-hostile mutable global state...? XXX SUCKO FIXME
#
fun check_for_io_opportunities wait_requests
=
wio::wait_for_io_opportunity # In some OSs (e.g., Linux) this may raise an EINTR error even though it is non-blocking.
{
wait_requests,
timeout => THE tim::zero_time
}
except
_ = [];
# NOTE: As in the case of condition variables -- see
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg # -- we need to do the finish_do1mailoprun routine when we enable the
# io_mailop (instead of in the mailop-now-ready-to-fire fate).
#
fun io_now_possible_on' ioplea # This fn is called (only) from
src/lib/std/src/threadkit/posix/winix-io.pkg = #
src/lib/std/src/socket/proto-socket.pkg itt::BASE_MAILOPS [is_mailop_ready_to_fire] #
src/lib/std/src/posix/winix-data-file-io-driver-for-posix.pkg where
fun suspend_then_eventually_fire_mailop # Reppy refers to 'suspend_then_eventually_fire_mailop' as 'blockFn'.
{
do1mailoprun_status, # 'do_one_mailop' is supposed to fire exactly one mailop: 'do1mailoprun_status' is basically a mutex enforcing this.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
return_to__suspend_then_eventually_fire_mailops__loop # After setting up a mailop-ready-to-fire watch, we call this fn to return control to mailop.pkg.
}
=
# This fn gets used in
#
#
src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg #
# when a
#
# do_one_mailop [ ... ]
#
# call has no mailops ready to fire. 'do_one_mailop' must then block until
# at least one mailop is ready to fire. It does this by calling the
#
# suspend_then_eventually_fire_mailop ()
#
# fn on each mailop in the list; each such call will typically
# make an entry in one or more run queues of blocked threads.
#
# The first mailop to fire cancels the rest by doing
#
# do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE;
#
an_ioplea
where
(call_with_current_fate
(
\\ fate
=
{ item = { ioplea, do1mailoprun_status, finish_do1mailoprun, fate };
#
waiting_queue__local := item ! *waiting_queue__local;
#
return_to__suspend_then_eventually_fire_mailops__loop (); # Return control to mailop.pkg.
#
raise exception DIE "io_mailop' -- return_to__suspend_then_eventually_fire_mailops__loop returned";
# return_to__suspend_then_eventually_fire_mailops__loop() should never return.
}
)
)
-> an_ioplea; # Execution will pick up on this like when 'fate' is eventually invoked.
# This will happen below in add_any_new_fd_io_opportunities_to_run_queue__iu() after a C-level
end; # select()/poll() indicates that fd read/write is possible (i.e., data in input buffer or space in output buffer).
fun is_mailop_ready_to_fire () # Reppy refers to 'is_mailop_ready_to_fire' calls this 'pollFn'.
= # If it returns READY_MAILOP then the mailop is a candidate to fire in the do_one_mailop [] call.
case (check_for_io_opportunities [ioplea]) # If it returns UNREADY_MAILOP then the mailop is not a candidate to fire in the do_one_mailop [] call.
#
[an_ioplea]
=>
itt::READY_MAILOP
{
fire_mailop => {. log::uninterruptible_scope_mutex := 0; # Reppy refers to 'fire_mailop' as 'doFn'.
an_ioplea;
}
};
_ => itt::UNREADY_MAILOP suspend_then_eventually_fire_mailop;
esac;
end;
fun same_ioplea (an_ioplea, ioplea) # This is an internal fn, not exported to clients.
=
an_ioplea == ioplea;
fun drop_cancelled_mailops wait_queue
=
# We return the thinned I/O waiting queue
# along with the list of wait_requests in it.
#
# NB: Both will be in reverse order relative
# to wait_queue arg:
#
drop_cancelled_mailops' (wait_queue, [], [])
where
fun drop_cancelled_mailops' ([] : List( Ioplea_Item ), wait_requests, wait_queue)
=>
(wait_requests, wait_queue); # Done.
drop_cancelled_mailops' ({ do1mailoprun_status => REF itt::DO1MAILOPRUN_IS_COMPLETE, ... } ! rest, wait_requests, wait_queue)
=>
drop_cancelled_mailops' (rest, wait_requests, wait_queue); # Drop completed/cancelled mailop.
drop_cancelled_mailops' ((item as { ioplea, ... } ) ! rest, wait_requests, wait_queue)
=>
drop_cancelled_mailops' # Pass everything else through.
( rest,
ioplea ! wait_requests,
item ! wait_queue
);
end;
end;
fun push_io_onto_run_queue # Private to this file.
(
{ do1mailoprun_status as REF (itt::DO1MAILOPRUN_IS_BLOCKED thread), # The wait_queue entry corresponding to 'an_ioplea'.
finish_do1mailoprun, # Do any required end-of-do1mailoprun work such as do1mailoprun_status := DO1MAILOPRUN_IS_COMPLETE; and sending nacks as appropriate.
fate,
ioplea
},
an_ioplea # The I/O opportunity now open.
)
=>
# Enqueue a thread that is polling on the ready queue.
#
# We need two call_with_current_fate calls here:
#
# o One to capture current fate, so we can continue
# with it when we are done.
#
# o One to construct the fate we are going to enter
# into the run queue. This mainly consists of
# combining our 'fate' arg with our 'an_ioplea' arg to
# produce a new fate implicitly wrapping both.
#
# We also must catch the case where the mailop
# has been canceled, since a single thread might be
# polling on multiple descriptors.
#
{ (call_with_current_fate
(
\\ main_fate # 'main_fate' represents the vanilla computation starting below with -> fate_ioplea;
=
{ call_with_current_fate
(\\ fate_ioplea # 'fate_ioplea' represents the computation 'switch_to_fate fate ioplea'.
= # (Since switch_to_fate never returns, that is the whole of 'fate_ioplea'.)
switch_to_fate main_fate fate_ioplea); # This arranges for the main fate to execute as expected. Without this line
# # we'd immediately run off and do 'switch_to_fate fate ioplea' and the do1mailoprun_status := ... ; ... stuff would never execute.
switch_to_fate fate an_ioplea;
}
)
)
-> fate_ioplea; # Execution picks up here when someone eventually calls main_fate with an argument.
# 'fate_ioplea' is essentially 'fate ioplea'. It has type Fate(Void).
do1mailoprun_status := itt::DO1MAILOPRUN_IS_COMPLETE;
finish_do1mailoprun ();
mps::push_into_run_queue (thread, fate_ioplea);
};
push_io_onto_run_queue ( { do1mailoprun_status => REF itt::DO1MAILOPRUN_IS_COMPLETE, ... }, _)
=>
();
end;
fun add_any_new_fd_io_opportunities_to_run_queue__iu () # This is an external entrypoint into this file.
= # This fn is called (only) from:
src/lib/src/lib/thread-kit/src/posix/threadkit-driver-for-posix.pkg case (drop_cancelled_mailops *waiting_queue__local)
#
([], _) => waiting_queue__local := [];
#
(wait_requests, wait_queue) # NB: wait_requests and wait_queue are both reverse-order relative to *waiting_queue__local.
=>
case (check_for_io_opportunities wait_requests)
#
[] => waiting_queue__local := reverse wait_queue; # The 'reverse' restores original *waiting_queue__local ordering.
#
l => filter (l, wait_queue, [])
where
fun filter ([], r, result_wait_queue)
=>
waiting_queue__local
:=
list::reverse_and_prepend (r, result_wait_queue); # Reverse 'r' and prepend it to (already-re-reversed) result_wait_queue.
filter
( an_ioplea ! iopleas, # (Remaining) list of I/O opportunities identified by check_for_io_opportunities.
(item: Ioplea_Item) ! items, # (Remaining) reversed wait_queue.
result_wait_queue #
)
=>
if (same_ioplea (an_ioplea, item.ioplea)) # We're searching down our wait_queue for the item matching 'an_ioplea'.
# # Aha -- found the wait_queue item matching 'an_ioplea'.
push_io_onto_run_queue (item, an_ioplea); # Schedule the I/O corresponding to 'an_ioplea'.
filter (iopleas, items, result_wait_queue); # Drop I/O opportunity from wait list.
else
filter (an_ioplea ! iopleas, items, item ! result_wait_queue); # Not the right item -- move it to result_wait_queue and keep searching down wait_queue.
fi;
filter _ => raise exception DIE "Compiler bug: Unsupported case in add_any_new_fd_io_opportunities_to_run_queue__iu/filter.";
end;
end;
esac;
esac;
fun have_fds_on_io_watch ()
=
case *waiting_queue__local
#
[] => FALSE;
_ => TRUE;
esac;
};
end;