## when.pkg
#
# See comments in
#
src/lib/src/when.api# Compiled by:
#
src/lib/std/standard.libstipulate
package fil = file__premicrothread; # file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg package psx = posixlib; # posixlib is from
src/lib/std/src/psx/posixlib.pkgherein
package when
: When # When is from
src/lib/src/when.api {
# posixlib is from
src/lib/std/src/psx/posixlib.pkg # winix__premicrothread is from
src/lib/std/winix--premicrothread.pkg # winix-guts is from
src/lib/std/src/posix/winix-guts.pkg # data_file__premicrothread is from
src/lib/std/src/posix/data-file--premicrothread.pkg # winix_data_file_for_os_g__premicrothread is from
src/lib/std/src/io/winix-data-file-for-os-g--premicrothread.pkg # socket__premicrothread is from
src/lib/std/socket--premicrothread.pkg # socket_guts is from
src/lib/std/src/socket/socket-guts.pkg When_Rule (A_af, A_sock_type)
= NONBLOCKING
| TIMEOUT_SECS Float
| FD_IS_READ_READY (psx::File_Descriptor, Void -> Void)
| FD_IS_WRITE_READY (psx::File_Descriptor, Void -> Void)
| FD_HAS_OOBD_READY (psx::File_Descriptor, Void -> Void)
| IOD_IS_READ_READY (winix__premicrothread::io::Iod, Void -> Void)
| IOD_IS_WRITE_READY (winix__premicrothread::io::Iod, Void -> Void)
| IOD_HAS_OOBD_READY (winix__premicrothread::io::Iod, Void -> Void)
| STREAM_IS_READ_READY (fil::Input_Stream, Void -> Void)
| STREAM_IS_WRITE_READY (fil::Output_Stream, Void -> Void)
| BINARY_STREAM_IS_READ_READY (data_file__premicrothread::Input_Stream, Void -> Void)
| BINARY_STREAM_IS_WRITE_READY (data_file__premicrothread::Output_Stream, Void -> Void)
| SOCKET_IS_READ_READY (socket__premicrothread::Socket( A_af, A_sock_type ), Void -> Void)
| SOCKET_IS_WRITE_READY (socket__premicrothread::Socket( A_af, A_sock_type ), Void -> Void)
| SOCKET_HAS_OOBD_READY (socket__premicrothread::Socket( A_af, A_sock_type ), Void -> Void)
;
fun timeout_secs secs = TIMEOUT_SECS secs ;
fun fd_is_read_ready fd callback = FD_IS_READ_READY ( fd, callback);
fun fd_is_write_ready fd callback = FD_IS_WRITE_READY ( fd, callback);
fun fd_has_oobd_ready fd callback = FD_HAS_OOBD_READY ( fd, callback);
fun iod_is_read_ready iod callback = IOD_IS_READ_READY ( iod, callback);
fun iod_is_write_ready iod callback = IOD_IS_WRITE_READY ( iod, callback);
fun iod_has_oobd_ready iod callback = IOD_HAS_OOBD_READY ( iod, callback);
fun stream_is_read_ready stream callback = STREAM_IS_READ_READY (stream, callback);
fun stream_is_write_ready stream callback = STREAM_IS_WRITE_READY (stream, callback);
fun binary_stream_is_read_ready stream callback = BINARY_STREAM_IS_READ_READY (stream, callback);
fun binary_stream_is_write_ready stream callback = BINARY_STREAM_IS_WRITE_READY (stream, callback);
fun socket_is_read_ready socket callback = SOCKET_IS_READ_READY (socket, callback);
fun socket_is_write_ready socket callback = SOCKET_IS_WRITE_READY (socket, callback);
fun socket_has_oobd_ready socket callback = SOCKET_HAS_OOBD_READY (socket, callback);
stipulate
package int_map
=
int_red_black_map;
State
=
{ # Maps from int file descriptors
# to corresponding callbacks to execute:
#
timeout: Null_Or( time::Time ), # Timeout: NULL means wait forever, (THE time::zero_time) means do not block.
requests: List( winix__premicrothread::io::Ioplea ),
read_callbacks: int_map::Map( Void -> Void ),
write_callbacks: int_map::Map( Void -> Void ),
oobd_callbacks: int_map::Map( Void -> Void )
};
initial_state
=
{ timeout => (NULL: Null_Or( time::Time )),
requests => ([]: List( winix__premicrothread::io::Ioplea )),
read_callbacks => (int_map::empty: int_map::Map( Void -> Void )),
write_callbacks => (int_map::empty: int_map::Map( Void -> Void )),
oobd_callbacks => (int_map::empty: int_map::Map( Void -> Void ))
};
# file__premicrothread is from
src/lib/std/src/posix/file--premicrothread.pkg # winix_base_text_file_io_driver_for_posix__premicrothread is from
src/lib/std/src/io/winix-base-text-file-io-driver-for-posix--premicrothread.pkg # winix_base_file_io_driver_for_posix_g__premicrothread is from
src/lib/std/src/io/winix-base-file-io-driver-for-posix-g--premicrothread.pkg fun input_stream_to_iod input_stream
=
{ fun bad ()
=
{ fil::say {. "input_stream_to_iod: Don't know how to find io_descriptor for this stream."; };
#
raise exception DIE "when";
};
stream = fil::get_instream input_stream;
reader_and_vector
=
fil::pur::get_reader stream;
case reader_and_vector
#
(winix_base_text_file_io_driver_for_posix__premicrothread::FILEREADER { io_descriptor => THE iod, ... }, _)
=>
iod;
_ => bad ();
esac;
};
fun binary_input_stream_to_iod (input_stream: data_file__premicrothread::Input_Stream)
=
{ fun bad ()
=
{ fil::say {. "binary_input_stream_to_iod: Don't know how to find io_descriptor for this stream."; };
#
raise exception DIE "when";
};
stream = data_file__premicrothread::get_instream input_stream;
reader_and_vector
=
data_file__premicrothread::pur::get_reader stream;
case reader_and_vector
#
(winix_base_data_file_io_driver_for_posix__premicrothread::FILEREADER { io_descriptor => THE iod, ... }, _)
=>
iod;
_ => bad ();
esac;
};
fun output_stream_to_iod output_stream
=
{ fun bad ()
=
{ fil::say {. "output_stream_to_iod: Don't know how to find io_descriptor for this stream."; };
#
raise exception DIE "when";
};
stream = fil::get_outstream output_stream;
writer_and_buffer
=
fil::pur::get_writer stream;
case writer_and_buffer
#
(winix_base_text_file_io_driver_for_posix__premicrothread::FILEWRITER { io_descriptor => THE iod, ... }, _)
=>
iod;
_ => bad ();
esac;
};
fun binary_output_stream_to_iod output_stream
=
{ fun bad ()
=
{ fil::say {. "binary_output_stream_to_iod: Don't know how to find io_descriptor for this stream."; };
#
raise exception DIE "when";
};
stream
=
data_file__premicrothread::get_outstream output_stream;
writer_and_buffer
=
data_file__premicrothread::pur::get_writer stream;
case writer_and_buffer
#
(winix_base_data_file_io_driver_for_posix__premicrothread::FILEWRITER { io_descriptor => THE io_descriptor, ... }, _)
=>
io_descriptor;
_ => bad ();
esac;
};
fun digest_rules ([], { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks } )
=>
{ timeout, requests => reverse requests, read_callbacks, write_callbacks, oobd_callbacks };
digest_rules (rule ! rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks })
=>
case rule
#
NONBLOCKING
=>
digest_rules (rules, { timeout => THE time::zero_time, requests, read_callbacks, write_callbacks, oobd_callbacks });
TIMEOUT_SECS secs
=>
digest_rules (rules, { timeout => THE (time::from_float_seconds secs), requests, read_callbacks, write_callbacks, oobd_callbacks });
FD_IS_READ_READY (fd, callback)
=>
{ i = psx::fd_to_int fd;
io_descriptor = psx::fd_to_iod fd;
requests = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
!
requests;
read_callbacks = int_map::set (read_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
FD_IS_WRITE_READY (fd, callback)
=>
{ i = psx::fd_to_int fd;
io_descriptor = psx::fd_to_iod fd;
requests = { io_descriptor,
readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
!
requests;
write_callbacks = int_map::set (write_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
FD_HAS_OOBD_READY (fd, callback)
=>
{ i = psx::fd_to_int fd;
io_descriptor = psx::fd_to_iod fd;
requests = { io_descriptor,
readable => FALSE,
writable => FALSE,
oobdable => TRUE
}
!
requests;
oobd_callbacks = int_map::set (oobd_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
IOD_IS_READ_READY (io_descriptor, callback)
=>
{ fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
!
requests;
read_callbacks = int_map::set (read_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
IOD_IS_WRITE_READY (io_descriptor, callback)
=>
{ fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
!
requests;
write_callbacks = int_map::set (write_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
IOD_HAS_OOBD_READY (io_descriptor, callback)
=>
{ fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => FALSE,
oobdable => TRUE
}
!
requests;
oobd_callbacks = int_map::set (oobd_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
STREAM_IS_READ_READY (stream, callback)
=>
{ io_descriptor = input_stream_to_iod stream;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
printf "src/lib/src/when.pkg: digest_rules: STREAM_IS_READ_READY: i d=%d\n" i;
requests = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
!
requests;
read_callbacks = int_map::set (read_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
STREAM_IS_WRITE_READY (stream, callback)
=>
{ io_descriptor = output_stream_to_iod stream;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
!
requests;
printf "src/lib/src/when.pkg: digest_rules: STREAM_IS_WRITE_READY: i d=%d\n" i;
write_callbacks = int_map::set (write_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
BINARY_STREAM_IS_READ_READY (stream, callback)
=>
{ io_descriptor = binary_input_stream_to_iod stream;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
!
requests;
read_callbacks = int_map::set (read_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
BINARY_STREAM_IS_WRITE_READY (stream, callback)
=>
{ io_descriptor = binary_output_stream_to_iod stream;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
!
requests;
write_callbacks = int_map::set (write_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
SOCKET_IS_READ_READY (socket, callback)
=>
{ io_descriptor = socket__premicrothread::io_descriptor socket;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => TRUE,
writable => FALSE,
oobdable => FALSE
}
!
requests;
read_callbacks = int_map::set (read_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
SOCKET_IS_WRITE_READY (socket, callback)
=>
{ io_descriptor = socket__premicrothread::io_descriptor socket;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => TRUE,
oobdable => FALSE
}
!
requests;
write_callbacks = int_map::set (write_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
SOCKET_HAS_OOBD_READY (socket, callback)
=>
{ io_descriptor = socket__premicrothread::io_descriptor socket;
fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
requests = { io_descriptor,
readable => FALSE,
writable => FALSE,
oobdable => TRUE
}
!
requests;
oobd_callbacks = int_map::set (oobd_callbacks, i, callback);
digest_rules (rules, { timeout, requests, read_callbacks, write_callbacks, oobd_callbacks });
};
esac;
end; # fun digest_rules;
fun do_oobds ([]: List( winix__premicrothread::io::Ioplea_Result ), state: State, n)
=>
n;
do_oobds ( poll_result ! poll_results, state, n)
=>
if poll_result.oobdable
#
i = psx::fd_to_int (psx::iod_to_fd poll_result.io_descriptor);
callback = the (int_map::get (state.oobd_callbacks, i));
callback ();
do_oobds ( poll_results, state, n + 1);
else
do_oobds ( poll_results, state, n );
fi;
end;
fun do_reads ([]: List( winix__premicrothread::io::Ioplea_Result ), state: State, n)
=>
n;
do_reads ( poll_result ! poll_results, state, n)
=>
{ ii = psx::fd_to_int (psx::iod_to_fd poll_result.io_descriptor);
rr = poll_result.readable ?? "TRUE" :: "FALSE";
ww = poll_result.writable ?? "TRUE" :: "FALSE";
oo = poll_result.oobdable ?? "TRUE" :: "FALSE";
printf "src/lib/src/when.pkg: do_reads: poll_result { io_descriptor => %d, readable => %s, writable => %s, oobdable => %s }\n" ii rr ww oo;
if poll_result.readable
#
i = psx::fd_to_int (psx::iod_to_fd poll_result.io_descriptor);
callback = the (int_map::get (state.read_callbacks, i));
print "src/lib/src/when.pkg: do_reads: invoking callback ()\n";
callback ();
print "src/lib/src/when.pkg: do_reads: back from callback ()\n";
do_reads ( poll_results, state, n + 1);
else
do_reads ( poll_results, state, n );
fi;
};
end;
fun do_writes ([]: List( winix__premicrothread::io::Ioplea_Result ), state: State, n)
=>
n;
do_writes ( poll_result ! poll_results, state, n)
=>
{ ii = psx::fd_to_int (psx::iod_to_fd poll_result.io_descriptor);
rr = poll_result.readable ?? "TRUE" :: "FALSE";
ww = poll_result.writable ?? "TRUE" :: "FALSE";
oo = poll_result.oobdable ?? "TRUE" :: "FALSE";
printf "src/lib/src/when.pkg: do_writes: poll_result { io_descriptor => %d, readable => %s, writable => %s, oobdable => %s }\n" ii rr ww oo;
if (poll_result.writable)
#
i = psx::fd_to_int (psx::iod_to_fd poll_result.io_descriptor);
callback = the (int_map::get (state.write_callbacks, i));
print "src/lib/src/when.pkg: do_writes: invoking callback ()\n";
callback ();
print "src/lib/src/when.pkg: do_writes: back from callback ()\n";
do_writes ( poll_results, state, n + 1);
else
do_writes ( poll_results, state, n );
fi;
};
end;
stipulate
fun print_timeout NULL => print " timeout == NULL;\n";
print_timeout (THE time) => printf " timeout == THE %f;\n" (time::to_float_seconds time);
end;
fun bool b
=
b ?? "TRUE" :: "FALSE";
fun print_requests []
=>
();
#
print_requests ({ io_descriptor, readable, writable, oobdable } ! requests)
=>
{ fd = psx::iod_to_fd io_descriptor;
i = psx::fd_to_int fd;
#
printf
" io_descriptor => %d readable => %s writable => %s oobdable => %s\n"
i
(bool readable)
(bool writable)
(bool oobdable);
#
print_requests requests;
};
end;
herein
fun print_poll_args (state: State)
=
{ print "\npoll args:\n";
print_timeout state.timeout;
print_requests state.requests;
};
fun print_poll_results poll_results
=
{ print "\npoll results:\n";
print_requests poll_results;
};
end;
herein
fun when rules
=
{ state = digest_rules (rules, initial_state);
print "src/lib/src/when.pkg: when:\n"; print_poll_args state;
poll_results = winix__premicrothread::io::wait_for_io_opportunity { wait_requests => state.requests, timeout => state.timeout };
print "src/lib/src/when.pkg: when:\n"; print_poll_results poll_results;
oobds_done = do_oobds (poll_results, state, 0);
reads_done = do_reads (poll_results, state, 0);
# To reduce the chance of deadlock
# we do writes only if we did no reads:
#
writes_done
=
if (reads_done == 0) do_writes (poll_results, state, 0);
else 0;
fi;
{ reads_done,
writes_done,
oobds_done
};
};
end;
};
end;
# Notes on select type operations:
# A high-level socket-specific select is implemented in:
#
src/lib/std/src/socket/socket--premicrothread.api#
src/lib/std/src/socket/socket-guts.pkg# This is built on top of:
# A low-level 'poll' implementation is implemented in:
#
src/lib/std/src/winix/winix-io--premicrothread.api#
src/lib/std/src/posix/winix-io--premicrothread.pkg# Notes on descriptor definitions and conversions:
#
src/lib/std/src/socket/proto-socket--premicrothread.pkg# Socket_Descriptor = winix_types::io::Iod;
#
src/lib/std/src/posix/winix-io--premicrothread.pkg# Iod = winix__premicrothread::io::Iod;
#
src/lib/std/src/psx/posix-file.api#
# fd_to_int: File_Descriptor -> host_int::Int;
# int_to_fd: host_int::Int -> File_Descriptor;
#
# fd_to_iod: File_Descriptor -> winix__premicrothread::io::Iod;
# iod_to_fd: winix__premicrothread::io::Iod -> Null_Or( File_Descriptor );
#
src/lib/std/src/psx/posix-file.pkg#
# File_Descriptor = FILE_DESCRIPTOR { fd: host_int::Int };
#
# fun fd_to_unt (FILE_DESCRIPTOR { fd, ... } ) = host_unt::from_int fd;
# fun unt_to_fd fd = FILE_DESCRIPTOR { fd => host_unt::to_int fd };
#
# # Conversions between winix__premicrothread::io::Iod values and Posix file descriptors.
# #
# fun fd_to_iod (FILE_DESCRIPTOR { fd, ... } ) = winix_types::io::IO_DESCRIPTOR fd;
# fun iod_to_fd (winix__premicrothread::io::IO_DESCRIPTOR fd) = THE (FILE_DESCRIPTOR { fd } );
#
src/lib/std/src/posix/winix-types.pkg# Iod = IO_DESCRIPTOR Int;
#
#
src/lib/std/src/io/winix-base-file-io-driver-for-posix-g--premicrothread.pkg#
#
src/lib/std/src/socket/proto-socket--premicrothread.pkg# Socket_Fd = Int;
#
src/lib/std/src/socket/plain-socket--premicrothread.pkg# fun fd2sock file_descriptor = s::SOCKET { file_descriptor, nonblocking => REF FALSE };
#
src/lib/std/src/io/winix-text-file-for-os--premicrothread.api# make_instream: pure_io::Input_Stream -> Input_Stream;
# get_instream: Input_Stream -> pure_io::Input_Stream;
# S I F W
# Socket_Descriptor .
# Iod . D
# File_Descriptor C . A
# int E B .
# A
src/lib/std/src/psx/posix-file.pkg fd_to_int
# B
src/lib/std/src/psx/posix-file.pkg int_to_fd
# C
src/lib/std/src/psx/posix-file.pkg fd_to_iod
# D
src/lib/std/src/psx/posix-file.pkg iod_to_fd
# E (nonexistent:) nonblocking-socket-junk.pkg make_io_descriptor
## Copyright (c) 2008 Jeffrey S Prothero
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2015,
## released per terms of SMLNJ-COPYRIGHT.