Duncan Coutts pushed to branch wip/io-manager-deadlock-detection at Glasgow Haskell Compiler / GHC
WARNING: The push did not contain any new commits, but force pushed to delete the commits and changes below.
Deleted commits:
e9c4d16c by Duncan Coutts at 2026-02-16T00:30:50+00:00
Free per-cap I/O managers during shutdown and forkProcess
Historically this was not strictly necessary. The select and win32
legacy I/O managers did not maintain any dynamically allocated
resources. The new poll one does (an auxillary table), and so this
should be freed.
After forkProcess, all threads get deleted. This includes threads
waiting on I/O or timers. So as of this patch, resetting the I/O
manager is just about tidying things up. For example, for the poll
I/O manager this will reset the size of the AIOP table (which
otherwise grows but never shrinks).
In future however the re-initialising will become neeecessary for
functionality, since some I/O managers will need to re-initialise
wakeup fds that are set CLOEXEC.
- - - - -
fcc1cdd3 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add an FdWakup module for posix I/O managers
This will be used to implement wakeupIOManager for in-RTS I/O managers.
It provides a notification/wakeup mechanism using FDs, suitable for
situations when I/O managers are blocked on a set of fds anyway.
- - - - -
4e4aa84f by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for select I/O manager
Uses the FdWakup mechanism.
- - - - -
6fe60774 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for poll I/O manager
Uses the FdWakup mechanism.
A quirk we have to cope with is that we now need to poll one more fd --
the wakeup_fd_r -- but this fd has no corresponding entry in the
aiop_table. This is awkward since we have set up our aiop_poll_table to
be an auxilliary table with matching indicies.
The solution this patch uses (and described in the comments) is to have
two tables: struct pollfd *aiop_poll_table, *full_poll_table;
and to have the aiop_poll_table alias the tail of the full_poll_table.
The head entry in the full_poll_table is the extra fd. So we poll the
full_poll_table, while the aiop_poll_table still has matching indicies
with the aiop_table.
Hurrah for C aliasing rules.
- - - - -
24b6f621 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for win32 legacy I/O manager
- - - - -
bc97a19f by Duncan Coutts at 2026-02-16T00:30:50+00:00
wakeupIOManager is now required for all I/O managers
We are going to rely on it. Previously it could be a no-op. Update the
docs in the header file.
Also, temporarily disable awaitCompletedTimeoutsOrIO post-condition
assertion. It will become more complicated due to wakeupIOManager, and
it's not yet clear how to express it.
We will re-introduce a post condition after a few more changes.
- - - - -
12 changed files:
- rts/Capability.c
- rts/IOManager.c
- rts/IOManager.h
- rts/IOManagerInternals.h
- rts/Schedule.c
- + rts/posix/FdWakeup.c
- + rts/posix/FdWakeup.h
- rts/posix/Poll.c
- rts/posix/Poll.h
- rts/posix/Select.c
- rts/posix/Select.h
- rts/rts.cabal
Changes:
=====================================
rts/Capability.c
=====================================
@@ -1280,6 +1280,7 @@ shutdownCapabilities(Task *task, bool safe)
static void
freeCapability (Capability *cap)
{
+ freeCapabilityIOManager(cap->iomgr);
stgFree(cap->mut_lists);
stgFree(cap->saved_mut_lists);
if (cap->current_segments) {
=====================================
rts/IOManager.c
=====================================
@@ -343,9 +343,7 @@ void initCapabilityIOManager(CapIOManager *iomgr)
switch (iomgr_type) {
#if defined(IOMGR_ENABLED_SELECT)
case IO_MANAGER_SELECT:
- iomgr->blocked_queue_hd = END_TSO_QUEUE;
- iomgr->blocked_queue_tl = END_TSO_QUEUE;
- iomgr->sleeping_queue = END_TSO_QUEUE;
+ initCapabilityIOManagerSelect(iomgr);
break;
#endif
@@ -373,6 +371,26 @@ void initCapabilityIOManager(CapIOManager *iomgr)
}
+void freeCapabilityIOManager(CapIOManager *iomgr)
+{
+ switch (iomgr_type) {
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ freeCapabilityIOManagerSelect(iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_POLL)
+ case IO_MANAGER_POLL:
+ freeCapabilityIOManagerPoll(iomgr);
+ break;
+#endif
+ default:
+ break;
+ }
+}
+
+
/* Called late in the RTS initialisation
*/
void startIOManager(void)
@@ -546,8 +564,21 @@ exitIOManager(bool wait_threads)
*/
void wakeupIOManager(void)
{
+ debugTrace(DEBUG_iomanager, "Sending wakeup to I/O manager...");
switch (iomgr_type) {
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ wakeupIOManagerSelect(MainCapability.iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_POLL)
+ case IO_MANAGER_POLL:
+ wakeupIOManagerPoll(MainCapability.iomgr);
+ break;
+#endif
+
#if defined(IOMGR_ENABLED_MIO_POSIX)
case IO_MANAGER_MIO_POSIX:
/* MIO Posix implementation in posix/Signals.c */
@@ -572,8 +603,13 @@ void wakeupIOManager(void)
#endif
break;
#endif
- default:
+#if defined(IOMGR_ENABLED_WIN32_LEGACY)
+ case IO_MANAGER_WIN32_LEGACY:
+ abandonRequestWait();
break;
+#endif
+ default:
+ barf("wakeupIOManager not implemented");
}
}
@@ -782,7 +818,9 @@ void awaitCompletedTimeoutsOrIO(CapIOManager *iomgr)
default:
barf("pollCompletedTimeoutsOrIO not implemented");
}
- ASSERT(!emptyRunQueue(iomgr->cap) || getSchedState() != SCHED_RUNNING);
+ // FIXME: the post condition is now more complicated. Await can now simply
+ // be interrupted by wakeupIOManager.
+ // ASSERT(!emptyRunQueue(iomgr->cap) || getSchedState() != SCHED_RUNNING);
}
=====================================
rts/IOManager.h
=====================================
@@ -242,6 +242,28 @@ CapIOManager *allocCapabilityIOManager(Capability *cap);
*/
void initCapabilityIOManager(CapIOManager *iomgr);
+/* When shutting down a capability, or after forkProcess, free the resources
+ * held by a CapIOManager to put it back into a state in which either it can be
+ * re-initialised using initCapabilityIOManager, or the whole structure freed.
+ *
+ * Note that this does not free the CapIOManager structure itself, just the
+ * contents.
+ *
+ * This is used during capability shutdown, during RTS shutdown. It is not used
+ * when reducing the number of capabilities. Capabilities are disabled rather
+ * than freed entirely: the I/O manager keeps running but threads that become
+ * runnable are migrated away.
+ *
+ * It is also used after forkProcess.
+ */
+void freeCapabilityIOManager(CapIOManager *iomgr);
+
+/* CapIOManager life cycle:
+ *
+ * alloc -> init -> free -> free struct
+ * ^ |
+ * +--------+
+ */
/* Init hook: called from hs_init_ghc, very late in the startup after almost
* everything else is done.
@@ -283,19 +305,13 @@ void stopIOManager(void);
void exitIOManager(bool wait_threads);
-/* Wakeup hook: called from the scheduler's wakeUpRts (currently only in
- * threaded mode).
+/* Wakeup hook: called from the scheduler's wakeUpRts().
*
* The I/O manager can be blocked waiting on I/O or timers. Sometimes there are
* other external events where we need to wake up the I/O manager and return
- * to the schedulr.
- *
- * At the moment, all the non-threaded I/O managers will do this automagically
- * since a signal will interrupt any waiting system calls, so at the moment
- * the implementation for the non-threaded I/O managers does nothing.
+ * to the scheduler.
*
- * For the I/O managers in threaded mode, this arranges to unblock the I/O
- * manager if it waa blocked waiting.
+ * This arranges to unblock the I/O manager if it was blocked waiting.
*/
void wakeupIOManager(void);
=====================================
rts/IOManagerInternals.h
=====================================
@@ -46,6 +46,11 @@ struct _CapIOManager {
StgTSO *sleeping_queue;
#endif
+#if defined(IOMGR_ENABLED_SELECT) || defined(IOMGR_ENABLED_POLL)
+ /* FDs for waking up the I/O manager when it is blocked waiting */
+ int wakeup_fd_r, wakeup_fd_w;
+#endif
+
#if defined(IOMGR_ENABLED_POLL)
/* AIOP and timeout collections shared by several I/O manager impls */
ClosureTable aiop_table;
@@ -53,8 +58,11 @@ struct _CapIOManager {
#endif
#if defined(IOMGR_ENABLED_POLL)
- /* Auxiliary table with size and indexes matching the aiop_table */
- struct pollfd *aiop_poll_table;
+ /* Auxiliary table with size and indexes matching the aiop_table. This is
+ * aliased to the tail of the full poll table, which has a head entry for
+ * the wakeup_fd_r above, so we can also poll that fd.
+ */
+ struct pollfd *aiop_poll_table, *full_poll_table;
#endif
#if defined(IOMGR_ENABLED_WIN32_LEGACY)
=====================================
rts/Schedule.c
=====================================
@@ -2165,6 +2165,15 @@ forkProcess(HsStablePtr *entry
// exist.
truncateRunQueue(cap);
+ // Reset and re-initialise the capability's I/O manager,
+ // to get the I/O manager ready again.
+ //
+ // Any threads waiting on I/O or timers should have been
+ // removed from I/O manager queues by deleteThread_ above.
+ // TODO: but we could assert that here.
+ freeCapabilityIOManager(cap->iomgr);
+ initCapabilityIOManager(cap->iomgr);
+
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
cap->suspended_ccalls = NULL;
@@ -2309,6 +2318,10 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
// the capability; we don't have to worry about GC data
// structures, the nursery, etc.
//
+ // This approach also handles threads blocked on I/O. Such threads
+ // remain blocked, and when I/O completes and threads become runnable
+ // then they are migrated away.
+ //
for (n = new_n_capabilities; n < enabled_capabilities; n++) {
getCapability(n)->disabled = true;
traceCapDisable(getCapability(n));
=====================================
rts/posix/FdWakeup.c
=====================================
@@ -0,0 +1,132 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2025
+ *
+ * Utilities for a simple fd-based cross-thread wakeup mechanism.
+ *
+ * This is used in I/O managers, to provide a mechanism to wake them when they
+ * are blocked waiting on fds and timeouts. The mechanism works by including
+ * the read end fd into the set of fds the I/O manager waits on, and when a
+ * wake up is needed, the write end fd is used.
+ *
+ * This is implemented using either eventfd() or pipe().
+ *
+ * Linux 2.6.22+ and FreeBSD 13+ support eventfd. It is a single fd with a
+ * 64bit counter. It uses less resources than a pipe, and is probably a tad
+ * faster. Using write() adds to the counter, while read() reads and resets
+ * it. This gives us event combining.
+ *
+ * Otherwise we use a classic unix pipe.
+ *
+ * -------------------------------------------------------------------------*/
+
+#include "rts/PosixSource.h"
+#include "Rts.h"
+
+#include "FdWakeup.h"
+
+#include
+#include
+
+#ifdef HAVE_SYS_EVENTFD_H
+#include
+#endif
+
+#if !defined(HAVE_EVENTFD) \
+ || (defined(HAVE_EVENTFD) && !(defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)))
+static void fcntl_CLOEXEC_NONBLOCK(int fd)
+{
+ int res1 = fcntl(fd, F_SETFD, FD_CLOEXEC);
+ int res2 = fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (RTS_UNLIKELY(res1 < 0 || res2 < 0)) {
+ sysErrorBelch("newFdWakeup fcntl()");
+ stg_exit(EXIT_FAILURE);
+ }
+}
+#endif
+
+void newFdWakeup(int *wakeup_fd_r, int *wakeup_fd_w)
+{
+#if defined(HAVE_EVENTFD)
+ int wakeup_fd;
+#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
+ wakeup_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+#else
+ wakeup_fd = eventfd(0, 0);
+ if (wakeup_fd >= 0) fcntl_CLOEXEC_NONBLOCK(wakeup_fd);
+#endif
+ if (RTS_UNLIKELY(wakeup_fd < 0)) {
+ sysErrorBelch("newFdWakeup eventfd()");
+ stg_exit(EXIT_FAILURE);
+ }
+ /* eventfd uses the same fd for each end */
+ *wakeup_fd_r = wakeup_fd;
+ *wakeup_fd_w = wakeup_fd;
+#else
+ int pipefd[2];
+ int res;
+ res = pipe(pipefd);
+ if (RTS_UNLIKELY(res < 0)) {
+ sysErrorBelch("newFdWakeup pipe");
+ stg_exit(EXIT_FAILURE);
+ }
+ fcntl_CLOEXEC_NONBLOCK(pipefd[0]);
+ fcntl_CLOEXEC_NONBLOCK(pipefd[1]);
+ *wakeup_fd_r = pipefd[0]; /* read end */
+ *wakeup_fd_w = pipefd[1]; /* write end */
+#endif
+}
+
+void closeFdWakeup(int wakeup_fd_r, int wakeup_fd_w)
+{
+#if defined(HAVE_EVENTFD)
+ ASSERT(wakeup_fd_r == wakeup_fd_w);
+ close(wakeup_fd_r);
+#else
+ ASSERT(wakeup_fd_r != wakeup_fd_w);
+ close(wakeup_fd_r);
+ close(wakeup_fd_w);
+#endif
+}
+
+void sendFdWakeup(int wakeup_fd_w)
+{
+ int res;
+#if defined(HAVE_EVENTFD)
+ uint64_t val = 1;
+ res = write(wakeup_fd_w, &val, 8);
+#else
+ unsigned char buf = 1;
+ res = write(wakeup_fd_w, &buf, 1);
+#endif
+ if (RTS_UNLIKELY(res < 0)) {
+ /* Unlikely the pipe buffer will fill, but it would not be an error. */
+ if (errno == EAGAIN) return;
+ sysErrorBelch("sendFdWakeup write");
+ stg_exit(EXIT_FAILURE);
+ }
+}
+
+void collectFdWakeup(int wakeup_fd_r)
+{
+ int res;
+#if defined(HAVE_EVENTFD)
+ uint64_t buf;
+ /* eventfd combines events into one counter, so a single read is enough */
+ res = read(wakeup_fd_r, &buf, 8);
+#else
+ /* Drain the pipe buffer. Multiple wakeup notifications could
+ * have been sent before we have a chance to collect them.
+ */
+ uint64_t buf;
+ do {
+ res = read(wakeup_fd_r, &buf, 8);
+ } while (res == 8);
+#endif
+ if (RTS_UNLIKELY(res < 0)) {
+ /* After the first pipe read, it could block */
+ if (errno == EAGAIN) return;
+ sysErrorBelch("collectFdWakeup read");
+ stg_exit(EXIT_FAILURE);
+ }
+}
=====================================
rts/posix/FdWakeup.h
=====================================
@@ -0,0 +1,27 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2025
+ *
+ * Utilities for a simple fd-based cross-thread wakeup mechanism.
+ *
+ * This is used in I/O managers, to provide a mechanism to wake them when they
+ * are blocked waiting on fds and timeouts. The mechanism works by including
+ * the read end fd into the set of fds the I/O manager waits on, and when a
+ * wake up is needed, the write end fd is used.
+ *
+ * Prototypes for functions in FdWakeup.c
+ *
+ * -------------------------------------------------------------------------*/
+
+#pragma once
+
+#include "BeginPrivate.h"
+
+void newFdWakeup(int *fd_r, int *fd_w);
+void closeFdWakeup(int fd_r, int fd_w);
+
+void sendFdWakeup(int fd_w);
+void collectFdWakeup(int fd_r);
+
+#include "EndPrivate.h"
+
=====================================
rts/posix/Poll.c
=====================================
@@ -41,6 +41,7 @@
#include "IOManagerInternals.h"
#include "Timeout.h"
+#include "FdWakeup.h"
/******************************************************************************
@@ -107,8 +108,9 @@ timeout (if any) as the poll() timeout parameter.
The CapIOManager structure for this I/O manager contains:
ClosureTable aiop_table;
- struct pollfd *aiop_poll_table;
+ struct pollfd *aiop_poll_table, *full_poll_table;
StgTimeoutQueue *timeout_queue;
+ int wakeup_fd_r, wakeup_fd_w;
We also support the Linux-specific ppoll API which supports higher resolution
time delays -- nanoseconds rather than milliseconds as in classic poll(). It
@@ -117,6 +119,15 @@ also allows the signal mask to be adjusted, but we do not make use of this.
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask);
+We have both aiop_poll_table and full_poll_table. This is to cope with needing
+to wait on the special extra file descriptor wakeup_fd_r. This fd is used to
+support waking the I/O manager when we are blocked in a poll call. This
+requires waiting on an extra fd that has no corresponding entry in the
+aiop_table. To manage this quirk, we alias the aiop_poll_table to be the tail
+of the full_poll_table and have the first entry of the full_poll_table be the
+wakeup_fd_r. This means the aiop_poll_table indicies match up exactly with the
+aiop_table, but still allows the full_poll_table to have an extra entry.
+
******************************************************************************/
/* Forward declarations */
@@ -129,8 +140,31 @@ static void reportPollError(int res, nfds_t nfds) STG_NORETURN;
void initCapabilityIOManagerPoll(CapIOManager *iomgr)
{
initClosureTable(&iomgr->aiop_table, ClosureTableCompact);
- iomgr->aiop_poll_table = NULL;
iomgr->timeout_queue = emptyTimeoutQueue();
+
+ newFdWakeup(&iomgr->wakeup_fd_r, &iomgr->wakeup_fd_w);
+
+ iomgr->full_poll_table = stgMallocBytes(sizeof(struct pollfd) /* size 1 */,
+ "initCapabilityIOManagerPoll");
+ iomgr->full_poll_table[0] = (struct pollfd) {
+ .fd = iomgr->wakeup_fd_r,
+ .events = POLLIN,
+ .revents = 0
+ };
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1; /* hence empty */
+}
+
+
+void freeCapabilityIOManagerPoll(CapIOManager *iomgr)
+{
+ stgFree(iomgr->full_poll_table);
+ closeFdWakeup(iomgr->wakeup_fd_r, iomgr->wakeup_fd_w);
+}
+
+
+void wakeupIOManagerPoll(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->wakeup_fd_w);
}
@@ -275,7 +309,7 @@ static void notifyIOCompletion(CapIOManager *iomgr, StgAsyncIOOp *aiop)
}
-static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
+static bool processIOCompletions(CapIOManager *iomgr, int ncompletions)
{
/* The scheme we use with poll is that we have a dense poll table, and a
* corresponding table that maps to the closure table index. The poll
@@ -285,6 +319,19 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
*/
debugTrace(DEBUG_iomanager, "processIOCompletions(ncompletions = %d)",
ncompletions);
+
+ bool wakeup;
+ /* If the wakeup_fd_r is ready, collect it */
+ if (iomgr->full_poll_table[0].revents) {
+ ASSERT(iomgr->full_poll_table[0].fd == iomgr->wakeup_fd_r);
+ collectFdWakeup(iomgr->wakeup_fd_r);
+ ncompletions--;
+ wakeup = true;
+ debugTrace(DEBUG_iomanager, "Received wakeup in poll I/O manager.");
+ } else {
+ wakeup = false;
+ }
+
struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
int n = ncompletions;
int i = 0;
@@ -337,11 +384,14 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
i++;
}
}
+ return wakeup;
}
void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
if (!isEmptyTimeoutQueue(iomgr->timeout_queue)) {
Time now = getProcessElapsedTime();
processTimeoutCompletions(iomgr, now);
@@ -349,20 +399,20 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
if (!isEmptyClosureTable(&iomgr->aiop_table)) {
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
/* Poll for I/O readiness, without waiting. */
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
/* We could use poll here, since we use no timeout, but for
consistency we use the same syscall as at the other call site. */
struct timespec tv = (struct timespec) { .tv_sec = 0, .tv_nsec = 0 };
- int res = ppoll(iomgr->aiop_poll_table, nfds, &tv, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, &tv, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = 0, timeout.nsec = 0) = %d",
nfds, res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, 0);
+ int res = poll(iomgr->full_poll_table, nfds, 0);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = 0) = %d",
@@ -390,6 +440,10 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ bool wakeup = false; /* got woken up via wakeupIOManager */
+
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
/* Loop until we've woken up some threads. This loop is needed because the
* poll() timing isn't accurate, we sometimes sleep for a while but not
* long enough to wake up a thread in a threadDelay. Or we may need to
@@ -422,9 +476,9 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
#endif
/* Check for I/O readiness, possibly waiting. */
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
- int res = ppoll(iomgr->aiop_poll_table, nfds, timeout_ns, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, timeout_ns, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = %d, timeout.nsec = %d) = %d",
@@ -432,7 +486,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
timeout_ns == NULL ? 0 : timeout_ns->tv_nsec,
res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, timeout_ms);
+ int res = poll(iomgr->full_poll_table, nfds, timeout_ms);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = %d) = %d",
@@ -454,7 +508,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
} else if (res > 0) {
int ncompletions = res;
ASSERT(ncompletions <= (int)nfds);
- processIOCompletions(iomgr, ncompletions);
+ wakeup = processIOCompletions(iomgr, ncompletions);
} else if (errno == EINTR) {
/* We got interrupted by a signal. In the non-threaded RTS, if the
@@ -479,6 +533,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
}
} while (emptyRunQueue(iomgr->cap)
+ && !wakeup
&& (getSchedState() == SCHED_RUNNING));
}
@@ -508,13 +563,17 @@ static bool enlargeTables(CapIOManager *iomgr)
bool ok = enlargeClosureTable(iomgr->cap, &iomgr->aiop_table, newcapacity);
if (RTS_UNLIKELY(!ok)) return false;
- /* Update the auxiliary aiop_poll_table to match */
- struct pollfd *aiop_poll_table;
- aiop_poll_table = stgReallocBytes(iomgr->aiop_poll_table,
- sizeof(struct pollfd) * newcapacity,
- "Poll.c: enlargeTables");
- iomgr->aiop_poll_table = aiop_poll_table;
+ /* Update the auxiliary aiop_poll_table to match. The full_poll_table is
+ * one bigger than the aiop_poll_table, since it has an extra entry at the
+ * front for wakeup_fd_r, with no corresponding aiop. */
+ iomgr->full_poll_table =
+ stgReallocBytes(iomgr->full_poll_table,
+ sizeof(struct pollfd) * (newcapacity+1),
+ "Poll.c: enlargeTables");
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1;
+
/* Initialise the new part of the aiop_poll_table */
+ struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
for (int i = oldcapacity; i < newcapacity; i++) {
aiop_poll_table[i] = (struct pollfd) {
.fd = -1,
=====================================
rts/posix/Poll.h
=====================================
@@ -17,6 +17,8 @@
#if defined(IOMGR_ENABLED_POLL)
void initCapabilityIOManagerPoll(CapIOManager *iomgr);
+void freeCapabilityIOManagerPoll(CapIOManager *iomgr);
+void wakeupIOManagerPoll(CapIOManager *iomgr);
/* Synchronous I/O and timer operations */
bool syncIOWaitReadyPoll(CapIOManager *iomgr, StgTSO *tso,
=====================================
rts/posix/Select.c
=====================================
@@ -22,6 +22,7 @@
#include "IOManagerInternals.h"
#include "Stats.h"
#include "GetTime.h"
+#include "FdWakeup.h"
# if defined(HAVE_SYS_SELECT_H)
# include
@@ -54,6 +55,25 @@
#define TimeToLowResTimeRoundUp(t) (t)
#endif
+void initCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ iomgr->blocked_queue_hd = END_TSO_QUEUE;
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->sleeping_queue = END_TSO_QUEUE;
+
+ newFdWakeup(&iomgr->wakeup_fd_r, &iomgr->wakeup_fd_w);
+}
+
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ closeFdWakeup(iomgr->wakeup_fd_r, iomgr->wakeup_fd_w);
+}
+
+void wakeupIOManagerSelect(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->wakeup_fd_w);
+}
+
/*
* Return the time since the program started, in LowResTime,
* rounded down.
@@ -225,6 +245,7 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
bool seen_bad_fd = false;
struct timeval tv, *ptv;
LowResTime now;
+ bool wakeup = false; /* got woken up via wakeupIOManager */
IF_DEBUG(scheduler,
debugBelch("scheduler: checking for threads blocked on I/O");
@@ -252,6 +273,13 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
FD_ZERO(&rfd);
FD_ZERO(&wfd);
+ /* We're always interested in our wakeup fd */
+ {
+ int fd = iomgr->wakeup_fd_r;
+ maxfd = (fd > maxfd) ? fd : maxfd;
+ FD_SET(fd, &rfd);
+ }
+
for(tso = iomgr->blocked_queue_hd;
tso != END_TSO_QUEUE;
tso = next) {
@@ -376,6 +404,13 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
}
}
+ /* If the wakeup_fd_r is ready, collect it */
+ if (FD_ISSET(iomgr->wakeup_fd_r, &rfd)) {
+ collectFdWakeup(iomgr->wakeup_fd_r);
+ wakeup = true;
+ debugTrace(DEBUG_iomanager, "Received wakeup in select I/O manager.");
+ }
+
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
@@ -458,7 +493,8 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
}
} while (wait && getSchedState() == SCHED_RUNNING
- && emptyRunQueue(iomgr->cap));
+ && emptyRunQueue(iomgr->cap)
+ && !wakeup);
}
#endif /* IOMGR_ENABLED_SELECT */
=====================================
rts/posix/Select.h
=====================================
@@ -15,6 +15,10 @@ typedef StgWord LowResTime;
LowResTime getDelayTarget (HsInt us);
+void initCapabilityIOManagerSelect(CapIOManager *iomgr);
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr);
+void wakeupIOManagerSelect(CapIOManager *iomgr);
+
void awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait);
#include "EndPrivate.h"
=====================================
rts/rts.cabal
=====================================
@@ -569,6 +569,7 @@ library
wasm/OSThreads.c
wasm/JSFFI.c
wasm/JSFFIGlobals.c
+ posix/FdWakeup.c
posix/Select.c
posix/Poll.c
posix/Timeout.c
@@ -581,6 +582,7 @@ library
posix/Ticker.c
posix/OSMem.c
posix/OSThreads.c
+ posix/FdWakeup.c
posix/MIO.c
posix/Poll.c
posix/Select.c
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/24a403ed5992b638b5f08b99a8fa588...
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/24a403ed5992b638b5f08b99a8fa588...
You're receiving this email because of your account on gitlab.haskell.org.