Duncan Coutts pushed to branch wip/io-manager-deadlock-detection at Glasgow Haskell Compiler / GHC
Commits:
1136fd5b by Duncan Coutts at 2026-02-16T00:30:48+00:00
Split posix/MIO.c out of posix/Signals.c
The MIO I/O manager was secretly living inside the Signals file.
Now it gets its own file, like any other self-respecting I/O manager.
- - - - -
ec170e68 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Rationalise some scheduler run queue utilities
Move them all to the same place in the file.
Make some static that were used only internally.
Also remove a redundant assignment after calling truncateRunQueue that
is already done within truncateRunQueue.
- - - - -
c4312136 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Rename initIOManager{AfterFork} to {re}startIOManager
These are more accurate names, since these actions happen after
initialisation and are really about starting (or restarting) background
threads.
- - - - -
24a403ed by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add a TODO to the MIO I/O manager
The direction of travel is to make I/O managers per-capability and have
all their state live in the struct CapIOManager. The MIO I/O manager
however still has a number of global variables.
It's not obvious how handle these globals however.
- - - - -
e9c4d16c by Duncan Coutts at 2026-02-16T00:30:50+00:00
Free per-cap I/O managers during shutdown and forkProcess
Historically this was not strictly necessary. The select and win32
legacy I/O managers did not maintain any dynamically allocated
resources. The new poll one does (an auxillary table), and so this
should be freed.
After forkProcess, all threads get deleted. This includes threads
waiting on I/O or timers. So as of this patch, resetting the I/O
manager is just about tidying things up. For example, for the poll
I/O manager this will reset the size of the AIOP table (which
otherwise grows but never shrinks).
In future however the re-initialising will become neeecessary for
functionality, since some I/O managers will need to re-initialise
wakeup fds that are set CLOEXEC.
- - - - -
fcc1cdd3 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add an FdWakup module for posix I/O managers
This will be used to implement wakeupIOManager for in-RTS I/O managers.
It provides a notification/wakeup mechanism using FDs, suitable for
situations when I/O managers are blocked on a set of fds anyway.
- - - - -
4e4aa84f by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for select I/O manager
Uses the FdWakup mechanism.
- - - - -
6fe60774 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for poll I/O manager
Uses the FdWakup mechanism.
A quirk we have to cope with is that we now need to poll one more fd --
the wakeup_fd_r -- but this fd has no corresponding entry in the
aiop_table. This is awkward since we have set up our aiop_poll_table to
be an auxilliary table with matching indicies.
The solution this patch uses (and described in the comments) is to have
two tables: struct pollfd *aiop_poll_table, *full_poll_table;
and to have the aiop_poll_table alias the tail of the full_poll_table.
The head entry in the full_poll_table is the extra fd. So we poll the
full_poll_table, while the aiop_poll_table still has matching indicies
with the aiop_table.
Hurrah for C aliasing rules.
- - - - -
24b6f621 by Duncan Coutts at 2026-02-16T00:30:50+00:00
Add wakeupIOManager support for win32 legacy I/O manager
- - - - -
bc97a19f by Duncan Coutts at 2026-02-16T00:30:50+00:00
wakeupIOManager is now required for all I/O managers
We are going to rely on it. Previously it could be a no-op. Update the
docs in the header file.
Also, temporarily disable awaitCompletedTimeoutsOrIO post-condition
assertion. It will become more complicated due to wakeupIOManager, and
it's not yet clear how to express it.
We will re-introduce a post condition after a few more changes.
- - - - -
18 changed files:
- rts/Capability.c
- rts/IOManager.c
- rts/IOManager.h
- rts/IOManagerInternals.h
- rts/RtsStartup.c
- rts/Schedule.c
- rts/Schedule.h
- + rts/posix/FdWakeup.c
- + rts/posix/FdWakeup.h
- + rts/posix/MIO.c
- + rts/posix/MIO.h
- rts/posix/Poll.c
- rts/posix/Poll.h
- rts/posix/Select.c
- rts/posix/Select.h
- rts/posix/Signals.c
- rts/posix/Signals.h
- rts/rts.cabal
Changes:
=====================================
rts/Capability.c
=====================================
@@ -1280,6 +1280,7 @@ shutdownCapabilities(Task *task, bool safe)
static void
freeCapability (Capability *cap)
{
+ freeCapabilityIOManager(cap->iomgr);
stgFree(cap->mut_lists);
stgFree(cap->saved_mut_lists);
if (cap->current_segments) {
=====================================
rts/IOManager.c
=====================================
@@ -39,7 +39,7 @@
#endif
#if defined(IOMGR_ENABLED_MIO_POSIX)
-#include "posix/Signals.h"
+#include "posix/MIO.h"
#include "Prelude.h"
#endif
@@ -343,9 +343,7 @@ void initCapabilityIOManager(CapIOManager *iomgr)
switch (iomgr_type) {
#if defined(IOMGR_ENABLED_SELECT)
case IO_MANAGER_SELECT:
- iomgr->blocked_queue_hd = END_TSO_QUEUE;
- iomgr->blocked_queue_tl = END_TSO_QUEUE;
- iomgr->sleeping_queue = END_TSO_QUEUE;
+ initCapabilityIOManagerSelect(iomgr);
break;
#endif
@@ -373,11 +371,31 @@ void initCapabilityIOManager(CapIOManager *iomgr)
}
+void freeCapabilityIOManager(CapIOManager *iomgr)
+{
+ switch (iomgr_type) {
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ freeCapabilityIOManagerSelect(iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_POLL)
+ case IO_MANAGER_POLL:
+ freeCapabilityIOManagerPoll(iomgr);
+ break;
+#endif
+ default:
+ break;
+ }
+}
+
+
/* Called late in the RTS initialisation
*/
-void initIOManager(void)
+void startIOManager(void)
{
- debugTrace(DEBUG_iomanager, "initialising %s I/O manager", showIOManager());
+ debugTrace(DEBUG_iomanager, "starting %s I/O manager", showIOManager());
switch (iomgr_type) {
@@ -441,7 +459,7 @@ void initIOManager(void)
/* Called from forkProcess in the child process on the surviving capability.
*/
void
-initIOManagerAfterFork(CapIOManager *iomgr, Capability **pcap)
+restartIOManager(CapIOManager *iomgr, Capability **pcap)
{
switch (iomgr_type) {
@@ -546,8 +564,21 @@ exitIOManager(bool wait_threads)
*/
void wakeupIOManager(void)
{
+ debugTrace(DEBUG_iomanager, "Sending wakeup to I/O manager...");
switch (iomgr_type) {
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ wakeupIOManagerSelect(MainCapability.iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_POLL)
+ case IO_MANAGER_POLL:
+ wakeupIOManagerPoll(MainCapability.iomgr);
+ break;
+#endif
+
#if defined(IOMGR_ENABLED_MIO_POSIX)
case IO_MANAGER_MIO_POSIX:
/* MIO Posix implementation in posix/Signals.c */
@@ -572,8 +603,13 @@ void wakeupIOManager(void)
#endif
break;
#endif
- default:
+#if defined(IOMGR_ENABLED_WIN32_LEGACY)
+ case IO_MANAGER_WIN32_LEGACY:
+ abandonRequestWait();
break;
+#endif
+ default:
+ barf("wakeupIOManager not implemented");
}
}
@@ -782,7 +818,9 @@ void awaitCompletedTimeoutsOrIO(CapIOManager *iomgr)
default:
barf("pollCompletedTimeoutsOrIO not implemented");
}
- ASSERT(!emptyRunQueue(iomgr->cap) || getSchedState() != SCHED_RUNNING);
+ // FIXME: the post condition is now more complicated. Await can now simply
+ // be interrupted by wakeupIOManager.
+ // ASSERT(!emptyRunQueue(iomgr->cap) || getSchedState() != SCHED_RUNNING);
}
=====================================
rts/IOManager.h
=====================================
@@ -242,11 +242,33 @@ CapIOManager *allocCapabilityIOManager(Capability *cap);
*/
void initCapabilityIOManager(CapIOManager *iomgr);
+/* When shutting down a capability, or after forkProcess, free the resources
+ * held by a CapIOManager to put it back into a state in which either it can be
+ * re-initialised using initCapabilityIOManager, or the whole structure freed.
+ *
+ * Note that this does not free the CapIOManager structure itself, just the
+ * contents.
+ *
+ * This is used during capability shutdown, during RTS shutdown. It is not used
+ * when reducing the number of capabilities. Capabilities are disabled rather
+ * than freed entirely: the I/O manager keeps running but threads that become
+ * runnable are migrated away.
+ *
+ * It is also used after forkProcess.
+ */
+void freeCapabilityIOManager(CapIOManager *iomgr);
+
+/* CapIOManager life cycle:
+ *
+ * alloc -> init -> free -> free struct
+ * ^ |
+ * +--------+
+ */
/* Init hook: called from hs_init_ghc, very late in the startup after almost
* everything else is done.
*/
-void initIOManager(void);
+void startIOManager(void);
/* Init hook: called from forkProcess in the child process on the surviving
@@ -255,8 +277,8 @@ void initIOManager(void);
* This is synchronous and can run Haskell code, so can change the given cap.
* TODO: it would make for a cleaner API here if this were made asynchronous.
*/
-void initIOManagerAfterFork(CapIOManager *iomgr,
- /* inout */ Capability **pcap);
+void restartIOManager(CapIOManager *iomgr,
+ /* inout */ Capability **pcap);
/* TODO: rationalise initIOManager and initIOManagerAfterFork into a single
per-capability init function.
@@ -283,19 +305,13 @@ void stopIOManager(void);
void exitIOManager(bool wait_threads);
-/* Wakeup hook: called from the scheduler's wakeUpRts (currently only in
- * threaded mode).
+/* Wakeup hook: called from the scheduler's wakeUpRts().
*
* The I/O manager can be blocked waiting on I/O or timers. Sometimes there are
* other external events where we need to wake up the I/O manager and return
- * to the schedulr.
- *
- * At the moment, all the non-threaded I/O managers will do this automagically
- * since a signal will interrupt any waiting system calls, so at the moment
- * the implementation for the non-threaded I/O managers does nothing.
+ * to the scheduler.
*
- * For the I/O managers in threaded mode, this arranges to unblock the I/O
- * manager if it waa blocked waiting.
+ * This arranges to unblock the I/O manager if it was blocked waiting.
*/
void wakeupIOManager(void);
=====================================
rts/IOManagerInternals.h
=====================================
@@ -46,6 +46,11 @@ struct _CapIOManager {
StgTSO *sleeping_queue;
#endif
+#if defined(IOMGR_ENABLED_SELECT) || defined(IOMGR_ENABLED_POLL)
+ /* FDs for waking up the I/O manager when it is blocked waiting */
+ int wakeup_fd_r, wakeup_fd_w;
+#endif
+
#if defined(IOMGR_ENABLED_POLL)
/* AIOP and timeout collections shared by several I/O manager impls */
ClosureTable aiop_table;
@@ -53,8 +58,11 @@ struct _CapIOManager {
#endif
#if defined(IOMGR_ENABLED_POLL)
- /* Auxiliary table with size and indexes matching the aiop_table */
- struct pollfd *aiop_poll_table;
+ /* Auxiliary table with size and indexes matching the aiop_table. This is
+ * aliased to the tail of the full poll table, which has a head entry for
+ * the wakeup_fd_r above, so we can also poll that fd.
+ */
+ struct pollfd *aiop_poll_table, *full_poll_table;
#endif
#if defined(IOMGR_ENABLED_WIN32_LEGACY)
=====================================
rts/RtsStartup.c
=====================================
@@ -427,7 +427,7 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config)
}
#endif
- initIOManager();
+ startIOManager();
x86_init_fpu();
=====================================
rts/Schedule.c
=====================================
@@ -174,6 +174,11 @@ static void deleteAllThreads (void);
static void deleteThread_(StgTSO *tso);
#endif
+#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
+static void truncateRunQueue(Capability *cap);
+#endif
+static StgTSO *popRunQueue (Capability *cap);
+
/* ---------------------------------------------------------------------------
Main scheduling loop.
@@ -591,38 +596,6 @@ run_thread:
} /* end of while() */
}
-/* -----------------------------------------------------------------------------
- * Run queue operations
- * -------------------------------------------------------------------------- */
-
-static void
-removeFromRunQueue (Capability *cap, StgTSO *tso)
-{
- if (tso->block_info.prev == END_TSO_QUEUE) {
- ASSERT(cap->run_queue_hd == tso);
- cap->run_queue_hd = tso->_link;
- } else {
- setTSOLink(cap, tso->block_info.prev, tso->_link);
- }
- if (tso->_link == END_TSO_QUEUE) {
- ASSERT(cap->run_queue_tl == tso);
- cap->run_queue_tl = tso->block_info.prev;
- } else {
- setTSOPrev(cap, tso->_link, tso->block_info.prev);
- }
- tso->_link = tso->block_info.prev = END_TSO_QUEUE;
- cap->n_run_queue--;
-
- IF_DEBUG(sanity, checkRunQueue(cap));
-}
-
-void
-promoteInRunQueue (Capability *cap, StgTSO *tso)
-{
- removeFromRunQueue(cap, tso);
- pushOnRunQueue(cap, tso);
-}
-
/* -----------------------------------------------------------------------------
* scheduleFindWork()
*
@@ -2191,7 +2164,15 @@ forkProcess(HsStablePtr *entry
// bound threads for which the corresponding Task does not
// exist.
truncateRunQueue(cap);
- cap->n_run_queue = 0;
+
+ // Reset and re-initialise the capability's I/O manager,
+ // to get the I/O manager ready again.
+ //
+ // Any threads waiting on I/O or timers should have been
+ // removed from I/O manager queues by deleteThread_ above.
+ // TODO: but we could assert that here.
+ freeCapabilityIOManager(cap->iomgr);
+ initCapabilityIOManager(cap->iomgr);
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
@@ -2208,7 +2189,7 @@ forkProcess(HsStablePtr *entry
cap->n_returning_tasks = 0;
#endif
- // Release all caps except 0, we'll use that for starting
+ // Release all caps except 0, we'll use that for restarting
// the IO manager and running the client action below.
if (cap->no != 0) {
task->cap = cap;
@@ -2232,7 +2213,7 @@ forkProcess(HsStablePtr *entry
// like startup event, capabilities, process info etc
traceTaskCreate(task, cap);
- initIOManagerAfterFork(cap->iomgr, &cap);
+ restartIOManager(cap->iomgr, &cap);
// start timer after the IOManager is initialized
// (the idle GC may wake up the IOManager)
@@ -2337,6 +2318,10 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
// the capability; we don't have to worry about GC data
// structures, the nursery, etc.
//
+ // This approach also handles threads blocked on I/O. Such threads
+ // remain blocked, and when I/O completes and threads become runnable
+ // then they are migrated away.
+ //
for (n = new_n_capabilities; n < enabled_capabilities; n++) {
getCapability(n)->disabled = true;
traceCapDisable(getCapability(n));
@@ -2997,7 +2982,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso)
cap->n_run_queue++;
}
-StgTSO *popRunQueue (Capability *cap)
+static StgTSO *popRunQueue (Capability *cap)
{
ASSERT(cap->n_run_queue > 0);
StgTSO *t = cap->run_queue_hd;
@@ -3017,6 +3002,45 @@ StgTSO *popRunQueue (Capability *cap)
return t;
}
+#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
+static void truncateRunQueue(Capability *cap)
+{
+ // Can only be called by the task owning the capability.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_hd, "truncateRunQueue");
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_tl, "truncateRunQueue");
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "truncateRunQueue");
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+ cap->n_run_queue = 0;
+}
+#endif
+
+static void removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->block_info.prev == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_hd == tso);
+ cap->run_queue_hd = tso->_link;
+ } else {
+ setTSOLink(cap, tso->block_info.prev, tso->_link);
+ }
+ if (tso->_link == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_tl == tso);
+ cap->run_queue_tl = tso->block_info.prev;
+ } else {
+ setTSOPrev(cap, tso->_link, tso->block_info.prev);
+ }
+ tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+ cap->n_run_queue--;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
+void promoteInRunQueue (Capability *cap, StgTSO *tso)
+{
+ removeFromRunQueue(cap, tso);
+ pushOnRunQueue(cap, tso);
+}
+
/* -----------------------------------------------------------------------------
raiseExceptionHelper
=====================================
rts/Schedule.h
=====================================
@@ -164,10 +164,6 @@ void appendToRunQueue (Capability *cap, StgTSO *tso);
*/
void pushOnRunQueue (Capability *cap, StgTSO *tso);
-/* Pop the first thread off the runnable queue.
- */
-StgTSO *popRunQueue (Capability *cap);
-
INLINE_HEADER StgTSO *
peekRunQueue (Capability *cap)
{
@@ -184,18 +180,6 @@ emptyRunQueue(Capability *cap)
return cap->n_run_queue == 0;
}
-INLINE_HEADER void
-truncateRunQueue(Capability *cap)
-{
- // Can only be called by the task owning the capability.
- TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_hd, "truncateRunQueue");
- TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_tl, "truncateRunQueue");
- TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "truncateRunQueue");
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
- cap->n_run_queue = 0;
-}
-
#endif /* !IN_STG_CODE */
#include "EndPrivate.h"
=====================================
rts/posix/FdWakeup.c
=====================================
@@ -0,0 +1,132 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2025
+ *
+ * Utilities for a simple fd-based cross-thread wakeup mechanism.
+ *
+ * This is used in I/O managers, to provide a mechanism to wake them when they
+ * are blocked waiting on fds and timeouts. The mechanism works by including
+ * the read end fd into the set of fds the I/O manager waits on, and when a
+ * wake up is needed, the write end fd is used.
+ *
+ * This is implemented using either eventfd() or pipe().
+ *
+ * Linux 2.6.22+ and FreeBSD 13+ support eventfd. It is a single fd with a
+ * 64bit counter. It uses less resources than a pipe, and is probably a tad
+ * faster. Using write() adds to the counter, while read() reads and resets
+ * it. This gives us event combining.
+ *
+ * Otherwise we use a classic unix pipe.
+ *
+ * -------------------------------------------------------------------------*/
+
+#include "rts/PosixSource.h"
+#include "Rts.h"
+
+#include "FdWakeup.h"
+
+#include
+#include
+
+#ifdef HAVE_SYS_EVENTFD_H
+#include
+#endif
+
+#if !defined(HAVE_EVENTFD) \
+ || (defined(HAVE_EVENTFD) && !(defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)))
+static void fcntl_CLOEXEC_NONBLOCK(int fd)
+{
+ int res1 = fcntl(fd, F_SETFD, FD_CLOEXEC);
+ int res2 = fcntl(fd, F_SETFL, O_NONBLOCK);
+ if (RTS_UNLIKELY(res1 < 0 || res2 < 0)) {
+ sysErrorBelch("newFdWakeup fcntl()");
+ stg_exit(EXIT_FAILURE);
+ }
+}
+#endif
+
+void newFdWakeup(int *wakeup_fd_r, int *wakeup_fd_w)
+{
+#if defined(HAVE_EVENTFD)
+ int wakeup_fd;
+#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
+ wakeup_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+#else
+ wakeup_fd = eventfd(0, 0);
+ if (wakeup_fd >= 0) fcntl_CLOEXEC_NONBLOCK(wakeup_fd);
+#endif
+ if (RTS_UNLIKELY(wakeup_fd < 0)) {
+ sysErrorBelch("newFdWakeup eventfd()");
+ stg_exit(EXIT_FAILURE);
+ }
+ /* eventfd uses the same fd for each end */
+ *wakeup_fd_r = wakeup_fd;
+ *wakeup_fd_w = wakeup_fd;
+#else
+ int pipefd[2];
+ int res;
+ res = pipe(pipefd);
+ if (RTS_UNLIKELY(res < 0)) {
+ sysErrorBelch("newFdWakeup pipe");
+ stg_exit(EXIT_FAILURE);
+ }
+ fcntl_CLOEXEC_NONBLOCK(pipefd[0]);
+ fcntl_CLOEXEC_NONBLOCK(pipefd[1]);
+ *wakeup_fd_r = pipefd[0]; /* read end */
+ *wakeup_fd_w = pipefd[1]; /* write end */
+#endif
+}
+
+void closeFdWakeup(int wakeup_fd_r, int wakeup_fd_w)
+{
+#if defined(HAVE_EVENTFD)
+ ASSERT(wakeup_fd_r == wakeup_fd_w);
+ close(wakeup_fd_r);
+#else
+ ASSERT(wakeup_fd_r != wakeup_fd_w);
+ close(wakeup_fd_r);
+ close(wakeup_fd_w);
+#endif
+}
+
+void sendFdWakeup(int wakeup_fd_w)
+{
+ int res;
+#if defined(HAVE_EVENTFD)
+ uint64_t val = 1;
+ res = write(wakeup_fd_w, &val, 8);
+#else
+ unsigned char buf = 1;
+ res = write(wakeup_fd_w, &buf, 1);
+#endif
+ if (RTS_UNLIKELY(res < 0)) {
+ /* Unlikely the pipe buffer will fill, but it would not be an error. */
+ if (errno == EAGAIN) return;
+ sysErrorBelch("sendFdWakeup write");
+ stg_exit(EXIT_FAILURE);
+ }
+}
+
+void collectFdWakeup(int wakeup_fd_r)
+{
+ int res;
+#if defined(HAVE_EVENTFD)
+ uint64_t buf;
+ /* eventfd combines events into one counter, so a single read is enough */
+ res = read(wakeup_fd_r, &buf, 8);
+#else
+ /* Drain the pipe buffer. Multiple wakeup notifications could
+ * have been sent before we have a chance to collect them.
+ */
+ uint64_t buf;
+ do {
+ res = read(wakeup_fd_r, &buf, 8);
+ } while (res == 8);
+#endif
+ if (RTS_UNLIKELY(res < 0)) {
+ /* After the first pipe read, it could block */
+ if (errno == EAGAIN) return;
+ sysErrorBelch("collectFdWakeup read");
+ stg_exit(EXIT_FAILURE);
+ }
+}
=====================================
rts/posix/FdWakeup.h
=====================================
@@ -0,0 +1,27 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2025
+ *
+ * Utilities for a simple fd-based cross-thread wakeup mechanism.
+ *
+ * This is used in I/O managers, to provide a mechanism to wake them when they
+ * are blocked waiting on fds and timeouts. The mechanism works by including
+ * the read end fd into the set of fds the I/O manager waits on, and when a
+ * wake up is needed, the write end fd is used.
+ *
+ * Prototypes for functions in FdWakeup.c
+ *
+ * -------------------------------------------------------------------------*/
+
+#pragma once
+
+#include "BeginPrivate.h"
+
+void newFdWakeup(int *fd_r, int *fd_w);
+void closeFdWakeup(int fd_r, int fd_w);
+
+void sendFdWakeup(int fd_w);
+void collectFdWakeup(int fd_r);
+
+#include "EndPrivate.h"
+
=====================================
rts/posix/MIO.c
=====================================
@@ -0,0 +1,163 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2005
+ *
+ * Signal processing / handling.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#include "rts/PosixSource.h"
+#include "Rts.h"
+
+#include "Schedule.h"
+#include "RtsUtils.h"
+#include "Prelude.h"
+#include "ThreadLabels.h"
+
+#include "MIO.h"
+#include "IOManager.h"
+#include "IOManagerInternals.h"
+
+#if defined(HAVE_ERRNO_H)
+# include
+#endif
+
+#include
+#include
+
+// Here's the pipe into which we will send our signals
+static int io_manager_wakeup_fd = -1;
+static int timer_manager_control_wr_fd = -1;
+// TODO: Eliminate these globals. Put then into the CapIOManager, but the
+// problem is these are shared across all caps, not per cap.
+
+#define IO_MANAGER_WAKEUP 0xff
+#define IO_MANAGER_DIE 0xfe
+#define IO_MANAGER_SYNC 0xfd
+
+void setTimerManagerControlFd(int fd) {
+ RELAXED_STORE(&timer_manager_control_wr_fd, fd);
+}
+
+void
+setIOManagerWakeupFd (int fd)
+{
+ // only called when THREADED_RTS, but unconditionally
+ // compiled here because GHC.Event.Control depends on it.
+ SEQ_CST_STORE(&io_manager_wakeup_fd, fd);
+}
+
+#if defined(THREADED_RTS)
+void timerManagerNotifySignal(int sig, siginfo_t *info)
+{
+ StgWord8 buf[sizeof(siginfo_t) + 1];
+ int r;
+
+ buf[0] = sig;
+ if (info == NULL) {
+ // info may be NULL on Solaris (see #3790)
+ memset(buf+1, 0, sizeof(siginfo_t));
+ } else {
+ memcpy(buf+1, info, sizeof(siginfo_t));
+ }
+
+ int timer_control_fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
+ if (0 <= timer_control_fd)
+ {
+ r = write(timer_control_fd, buf, sizeof(siginfo_t)+1);
+ if (r == -1 && errno == EAGAIN) {
+ errorBelch("lost signal due to full pipe: %d\n", sig);
+ }
+ }
+
+ // If the IO manager hasn't told us what the FD of the write end
+ // of its pipe is, there's not much we can do here, so just ignore
+ // the signal..
+}
+#endif
+
+
+/* -----------------------------------------------------------------------------
+ * Wake up at least one IO or timer manager HS thread.
+ * -------------------------------------------------------------------------- */
+void
+ioManagerWakeup (void)
+{
+ int r;
+ const int wakeup_fd = SEQ_CST_LOAD(&io_manager_wakeup_fd);
+ // Wake up the IO Manager thread by sending a byte down its pipe
+ if (wakeup_fd >= 0) {
+#if defined(HAVE_EVENTFD)
+ StgWord64 n = (StgWord64)IO_MANAGER_WAKEUP;
+ r = write(wakeup_fd, (char *) &n, 8);
+#else
+ StgWord8 byte = (StgWord8)IO_MANAGER_WAKEUP;
+ r = write(wakeup_fd, &byte, 1);
+#endif
+ /* N.B. If the TimerManager is shutting down as we run this
+ * then there is a possibility that our first read of
+ * io_manager_wakeup_fd is non-negative, but before we get to the
+ * write the file is closed. If this occurs, io_manager_wakeup_fd
+ * will be written into with -1 (GHC.Event.Control does this prior
+ * to closing), so checking this allows us to distinguish this case.
+ * To ensure we observe the correct ordering, we declare the
+ * io_manager_wakeup_fd as volatile.
+ * Since this is not an error condition, we do not print the error
+ * message in this case.
+ */
+ if (r == -1 && SEQ_CST_LOAD(&io_manager_wakeup_fd) >= 0) {
+ sysErrorBelch("ioManagerWakeup: write");
+ }
+ }
+}
+
+#if defined(THREADED_RTS)
+void
+ioManagerDie (void)
+{
+ StgWord8 byte = (StgWord8)IO_MANAGER_DIE;
+ uint32_t i;
+ int r;
+
+ {
+ // Shut down timer manager
+ const int fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
+ if (0 <= fd) {
+ r = write(fd, &byte, 1);
+ if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
+ RELAXED_STORE(&timer_manager_control_wr_fd, -1);
+ }
+ }
+
+ {
+ // Shut down IO managers
+ for (i=0; i < getNumCapabilities(); i++) {
+ const int fd = RELAXED_LOAD(&getCapability(i)->iomgr->control_fd);
+ if (0 <= fd) {
+ r = write(fd, &byte, 1);
+ if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
+ RELAXED_STORE(&getCapability(i)->iomgr->control_fd, -1);
+ }
+ }
+ }
+}
+
+void
+ioManagerStartCap (Capability **cap)
+{
+ rts_evalIO(cap,ensureIOManagerIsRunning_closure,NULL);
+}
+
+void
+ioManagerStart (void)
+{
+ // Make sure the IO manager thread is running
+ Capability *cap;
+ if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0 || SEQ_CST_LOAD(&io_manager_wakeup_fd) < 0) {
+ cap = rts_lock();
+ ioManagerStartCap(&cap);
+ rts_unlock(cap);
+ }
+}
+#endif
+
=====================================
rts/posix/MIO.h
=====================================
@@ -0,0 +1,30 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2005
+ *
+ * Signal processing / handling.
+ *
+ * ---------------------------------------------------------------------------*/
+
+#pragma once
+
+#include "IOManager.h"
+
+#if defined(HAVE_SIGNAL_H)
+# include
+#endif
+
+#include "BeginPrivate.h"
+
+/* Communicating with the IO manager thread (see GHC.Conc).
+ */
+void ioManagerWakeup (void);
+#if defined(THREADED_RTS)
+void ioManagerDie (void);
+void ioManagerStart (void);
+void ioManagerStartCap (/* inout */ Capability **cap);
+
+void timerManagerNotifySignal(int sig, siginfo_t *info);
+#endif
+
+#include "EndPrivate.h"
=====================================
rts/posix/Poll.c
=====================================
@@ -41,6 +41,7 @@
#include "IOManagerInternals.h"
#include "Timeout.h"
+#include "FdWakeup.h"
/******************************************************************************
@@ -107,8 +108,9 @@ timeout (if any) as the poll() timeout parameter.
The CapIOManager structure for this I/O manager contains:
ClosureTable aiop_table;
- struct pollfd *aiop_poll_table;
+ struct pollfd *aiop_poll_table, *full_poll_table;
StgTimeoutQueue *timeout_queue;
+ int wakeup_fd_r, wakeup_fd_w;
We also support the Linux-specific ppoll API which supports higher resolution
time delays -- nanoseconds rather than milliseconds as in classic poll(). It
@@ -117,6 +119,15 @@ also allows the signal mask to be adjusted, but we do not make use of this.
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask);
+We have both aiop_poll_table and full_poll_table. This is to cope with needing
+to wait on the special extra file descriptor wakeup_fd_r. This fd is used to
+support waking the I/O manager when we are blocked in a poll call. This
+requires waiting on an extra fd that has no corresponding entry in the
+aiop_table. To manage this quirk, we alias the aiop_poll_table to be the tail
+of the full_poll_table and have the first entry of the full_poll_table be the
+wakeup_fd_r. This means the aiop_poll_table indicies match up exactly with the
+aiop_table, but still allows the full_poll_table to have an extra entry.
+
******************************************************************************/
/* Forward declarations */
@@ -129,8 +140,31 @@ static void reportPollError(int res, nfds_t nfds) STG_NORETURN;
void initCapabilityIOManagerPoll(CapIOManager *iomgr)
{
initClosureTable(&iomgr->aiop_table, ClosureTableCompact);
- iomgr->aiop_poll_table = NULL;
iomgr->timeout_queue = emptyTimeoutQueue();
+
+ newFdWakeup(&iomgr->wakeup_fd_r, &iomgr->wakeup_fd_w);
+
+ iomgr->full_poll_table = stgMallocBytes(sizeof(struct pollfd) /* size 1 */,
+ "initCapabilityIOManagerPoll");
+ iomgr->full_poll_table[0] = (struct pollfd) {
+ .fd = iomgr->wakeup_fd_r,
+ .events = POLLIN,
+ .revents = 0
+ };
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1; /* hence empty */
+}
+
+
+void freeCapabilityIOManagerPoll(CapIOManager *iomgr)
+{
+ stgFree(iomgr->full_poll_table);
+ closeFdWakeup(iomgr->wakeup_fd_r, iomgr->wakeup_fd_w);
+}
+
+
+void wakeupIOManagerPoll(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->wakeup_fd_w);
}
@@ -275,7 +309,7 @@ static void notifyIOCompletion(CapIOManager *iomgr, StgAsyncIOOp *aiop)
}
-static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
+static bool processIOCompletions(CapIOManager *iomgr, int ncompletions)
{
/* The scheme we use with poll is that we have a dense poll table, and a
* corresponding table that maps to the closure table index. The poll
@@ -285,6 +319,19 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
*/
debugTrace(DEBUG_iomanager, "processIOCompletions(ncompletions = %d)",
ncompletions);
+
+ bool wakeup;
+ /* If the wakeup_fd_r is ready, collect it */
+ if (iomgr->full_poll_table[0].revents) {
+ ASSERT(iomgr->full_poll_table[0].fd == iomgr->wakeup_fd_r);
+ collectFdWakeup(iomgr->wakeup_fd_r);
+ ncompletions--;
+ wakeup = true;
+ debugTrace(DEBUG_iomanager, "Received wakeup in poll I/O manager.");
+ } else {
+ wakeup = false;
+ }
+
struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
int n = ncompletions;
int i = 0;
@@ -337,11 +384,14 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
i++;
}
}
+ return wakeup;
}
void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
if (!isEmptyTimeoutQueue(iomgr->timeout_queue)) {
Time now = getProcessElapsedTime();
processTimeoutCompletions(iomgr, now);
@@ -349,20 +399,20 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
if (!isEmptyClosureTable(&iomgr->aiop_table)) {
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
/* Poll for I/O readiness, without waiting. */
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
/* We could use poll here, since we use no timeout, but for
consistency we use the same syscall as at the other call site. */
struct timespec tv = (struct timespec) { .tv_sec = 0, .tv_nsec = 0 };
- int res = ppoll(iomgr->aiop_poll_table, nfds, &tv, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, &tv, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = 0, timeout.nsec = 0) = %d",
nfds, res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, 0);
+ int res = poll(iomgr->full_poll_table, nfds, 0);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = 0) = %d",
@@ -390,6 +440,10 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ bool wakeup = false; /* got woken up via wakeupIOManager */
+
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
/* Loop until we've woken up some threads. This loop is needed because the
* poll() timing isn't accurate, we sometimes sleep for a while but not
* long enough to wake up a thread in a threadDelay. Or we may need to
@@ -422,9 +476,9 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
#endif
/* Check for I/O readiness, possibly waiting. */
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
- int res = ppoll(iomgr->aiop_poll_table, nfds, timeout_ns, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, timeout_ns, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = %d, timeout.nsec = %d) = %d",
@@ -432,7 +486,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
timeout_ns == NULL ? 0 : timeout_ns->tv_nsec,
res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, timeout_ms);
+ int res = poll(iomgr->full_poll_table, nfds, timeout_ms);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = %d) = %d",
@@ -454,7 +508,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
} else if (res > 0) {
int ncompletions = res;
ASSERT(ncompletions <= (int)nfds);
- processIOCompletions(iomgr, ncompletions);
+ wakeup = processIOCompletions(iomgr, ncompletions);
} else if (errno == EINTR) {
/* We got interrupted by a signal. In the non-threaded RTS, if the
@@ -479,6 +533,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
}
} while (emptyRunQueue(iomgr->cap)
+ && !wakeup
&& (getSchedState() == SCHED_RUNNING));
}
@@ -508,13 +563,17 @@ static bool enlargeTables(CapIOManager *iomgr)
bool ok = enlargeClosureTable(iomgr->cap, &iomgr->aiop_table, newcapacity);
if (RTS_UNLIKELY(!ok)) return false;
- /* Update the auxiliary aiop_poll_table to match */
- struct pollfd *aiop_poll_table;
- aiop_poll_table = stgReallocBytes(iomgr->aiop_poll_table,
- sizeof(struct pollfd) * newcapacity,
- "Poll.c: enlargeTables");
- iomgr->aiop_poll_table = aiop_poll_table;
+ /* Update the auxiliary aiop_poll_table to match. The full_poll_table is
+ * one bigger than the aiop_poll_table, since it has an extra entry at the
+ * front for wakeup_fd_r, with no corresponding aiop. */
+ iomgr->full_poll_table =
+ stgReallocBytes(iomgr->full_poll_table,
+ sizeof(struct pollfd) * (newcapacity+1),
+ "Poll.c: enlargeTables");
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1;
+
/* Initialise the new part of the aiop_poll_table */
+ struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
for (int i = oldcapacity; i < newcapacity; i++) {
aiop_poll_table[i] = (struct pollfd) {
.fd = -1,
=====================================
rts/posix/Poll.h
=====================================
@@ -17,6 +17,8 @@
#if defined(IOMGR_ENABLED_POLL)
void initCapabilityIOManagerPoll(CapIOManager *iomgr);
+void freeCapabilityIOManagerPoll(CapIOManager *iomgr);
+void wakeupIOManagerPoll(CapIOManager *iomgr);
/* Synchronous I/O and timer operations */
bool syncIOWaitReadyPoll(CapIOManager *iomgr, StgTSO *tso,
=====================================
rts/posix/Select.c
=====================================
@@ -22,6 +22,7 @@
#include "IOManagerInternals.h"
#include "Stats.h"
#include "GetTime.h"
+#include "FdWakeup.h"
# if defined(HAVE_SYS_SELECT_H)
# include
@@ -54,6 +55,25 @@
#define TimeToLowResTimeRoundUp(t) (t)
#endif
+void initCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ iomgr->blocked_queue_hd = END_TSO_QUEUE;
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->sleeping_queue = END_TSO_QUEUE;
+
+ newFdWakeup(&iomgr->wakeup_fd_r, &iomgr->wakeup_fd_w);
+}
+
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ closeFdWakeup(iomgr->wakeup_fd_r, iomgr->wakeup_fd_w);
+}
+
+void wakeupIOManagerSelect(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->wakeup_fd_w);
+}
+
/*
* Return the time since the program started, in LowResTime,
* rounded down.
@@ -225,6 +245,7 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
bool seen_bad_fd = false;
struct timeval tv, *ptv;
LowResTime now;
+ bool wakeup = false; /* got woken up via wakeupIOManager */
IF_DEBUG(scheduler,
debugBelch("scheduler: checking for threads blocked on I/O");
@@ -252,6 +273,13 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
FD_ZERO(&rfd);
FD_ZERO(&wfd);
+ /* We're always interested in our wakeup fd */
+ {
+ int fd = iomgr->wakeup_fd_r;
+ maxfd = (fd > maxfd) ? fd : maxfd;
+ FD_SET(fd, &rfd);
+ }
+
for(tso = iomgr->blocked_queue_hd;
tso != END_TSO_QUEUE;
tso = next) {
@@ -376,6 +404,13 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
}
}
+ /* If the wakeup_fd_r is ready, collect it */
+ if (FD_ISSET(iomgr->wakeup_fd_r, &rfd)) {
+ collectFdWakeup(iomgr->wakeup_fd_r);
+ wakeup = true;
+ debugTrace(DEBUG_iomanager, "Received wakeup in select I/O manager.");
+ }
+
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
@@ -458,7 +493,8 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
}
} while (wait && getSchedState() == SCHED_RUNNING
- && emptyRunQueue(iomgr->cap));
+ && emptyRunQueue(iomgr->cap)
+ && !wakeup);
}
#endif /* IOMGR_ENABLED_SELECT */
=====================================
rts/posix/Select.h
=====================================
@@ -15,6 +15,10 @@ typedef StgWord LowResTime;
LowResTime getDelayTarget (HsInt us);
+void initCapabilityIOManagerSelect(CapIOManager *iomgr);
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr);
+void wakeupIOManagerSelect(CapIOManager *iomgr);
+
void awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait);
#include "EndPrivate.h"
=====================================
rts/posix/Signals.c
=====================================
@@ -9,21 +9,12 @@
#include "rts/PosixSource.h"
#include "Rts.h"
-#include "Schedule.h"
#include "RtsSignals.h"
-#include "Signals.h"
-#include "IOManager.h"
#include "RtsUtils.h"
+#include "Schedule.h"
#include "Prelude.h"
-#include "Ticker.h"
#include "ThreadLabels.h"
-#include "Libdw.h"
-
-/* TODO: eliminate this include. This file should be about signals, not be
- * part of an I/O manager implementation. The code here that are really part
- * of an I/O manager should be moved into an appropriate I/O manager impl.
- */
-#include "IOManagerInternals.h"
+#include "MIO.h"
#if defined(alpha_HOST_ARCH)
# if defined(linux_HOST_OS)
@@ -45,10 +36,6 @@
# include
#endif
-#if defined(HAVE_EVENTFD_H)
-# include
-#endif
-
#if defined(HAVE_TERMIOS_H)
#include
#endif
@@ -134,110 +121,6 @@ more_handlers(int sig)
nHandlers = sig + 1;
}
-// Here's the pipe into which we will send our signals
-static int io_manager_wakeup_fd = -1;
-static int timer_manager_control_wr_fd = -1;
-
-#define IO_MANAGER_WAKEUP 0xff
-#define IO_MANAGER_DIE 0xfe
-#define IO_MANAGER_SYNC 0xfd
-
-void setTimerManagerControlFd(int fd) {
- RELAXED_STORE(&timer_manager_control_wr_fd, fd);
-}
-
-void
-setIOManagerWakeupFd (int fd)
-{
- // only called when THREADED_RTS, but unconditionally
- // compiled here because GHC.Event.Control depends on it.
- SEQ_CST_STORE(&io_manager_wakeup_fd, fd);
-}
-
-/* -----------------------------------------------------------------------------
- * Wake up at least one IO or timer manager HS thread.
- * -------------------------------------------------------------------------- */
-void
-ioManagerWakeup (void)
-{
- int r;
- const int wakeup_fd = SEQ_CST_LOAD(&io_manager_wakeup_fd);
- // Wake up the IO Manager thread by sending a byte down its pipe
- if (wakeup_fd >= 0) {
-#if defined(HAVE_EVENTFD)
- StgWord64 n = (StgWord64)IO_MANAGER_WAKEUP;
- r = write(wakeup_fd, (char *) &n, 8);
-#else
- StgWord8 byte = (StgWord8)IO_MANAGER_WAKEUP;
- r = write(wakeup_fd, &byte, 1);
-#endif
- /* N.B. If the TimerManager is shutting down as we run this
- * then there is a possibility that our first read of
- * io_manager_wakeup_fd is non-negative, but before we get to the
- * write the file is closed. If this occurs, io_manager_wakeup_fd
- * will be written into with -1 (GHC.Event.Control does this prior
- * to closing), so checking this allows us to distinguish this case.
- * To ensure we observe the correct ordering, we declare the
- * io_manager_wakeup_fd as volatile.
- * Since this is not an error condition, we do not print the error
- * message in this case.
- */
- if (r == -1 && SEQ_CST_LOAD(&io_manager_wakeup_fd) >= 0) {
- sysErrorBelch("ioManagerWakeup: write");
- }
- }
-}
-
-#if defined(THREADED_RTS)
-void
-ioManagerDie (void)
-{
- StgWord8 byte = (StgWord8)IO_MANAGER_DIE;
- uint32_t i;
- int r;
-
- {
- // Shut down timer manager
- const int fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
- if (0 <= fd) {
- r = write(fd, &byte, 1);
- if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
- RELAXED_STORE(&timer_manager_control_wr_fd, -1);
- }
- }
-
- {
- // Shut down IO managers
- for (i=0; i < getNumCapabilities(); i++) {
- const int fd = RELAXED_LOAD(&getCapability(i)->iomgr->control_fd);
- if (0 <= fd) {
- r = write(fd, &byte, 1);
- if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
- RELAXED_STORE(&getCapability(i)->iomgr->control_fd, -1);
- }
- }
- }
-}
-
-void
-ioManagerStartCap (Capability **cap)
-{
- rts_evalIO(cap,ensureIOManagerIsRunning_closure,NULL);
-}
-
-void
-ioManagerStart (void)
-{
- // Make sure the IO manager thread is running
- Capability *cap;
- if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0 || SEQ_CST_LOAD(&io_manager_wakeup_fd) < 0) {
- cap = rts_lock();
- ioManagerStartCap(&cap);
- rts_unlock(cap);
- }
-}
-#endif
-
#if !defined(THREADED_RTS)
#define N_PENDING_HANDLERS 16
@@ -260,31 +143,9 @@ generic_handler(int sig USED_IF_THREADS,
void *p STG_UNUSED)
{
#if defined(THREADED_RTS)
-
- StgWord8 buf[sizeof(siginfo_t) + 1];
- int r;
-
- buf[0] = sig;
- if (info == NULL) {
- // info may be NULL on Solaris (see #3790)
- memset(buf+1, 0, sizeof(siginfo_t));
- } else {
- memcpy(buf+1, info, sizeof(siginfo_t));
- }
-
- int timer_control_fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
- if (0 <= timer_control_fd)
- {
- r = write(timer_control_fd, buf, sizeof(siginfo_t)+1);
- if (r == -1 && errno == EAGAIN) {
- errorBelch("lost signal due to full pipe: %d\n", sig);
- }
- }
-
- // If the IO manager hasn't told us what the FD of the write end
- // of its pipe is, there's not much we can do here, so just ignore
- // the signal..
-
+ //TODO: This calls MIO directly. We should go via IOManager API.
+ // The IOManager API should be extended to cover signals.
+ timerManagerNotifySignal(sig, info);
#else /* not THREADED_RTS */
/* Can't call allocate from here. Probably can't call malloc
=====================================
rts/posix/Signals.h
=====================================
@@ -27,18 +27,6 @@ void startSignalHandlers(Capability *cap);
void install_vtalrm_handler(int sig, TickProc handle_tick);
-/* Communicating with the IO manager thread (see GHC.Conc).
- *
- * TODO: these I/O manager things are not related to signals and ought to live
- * elsewhere, e.g. in a module specifically for the I/O manager.
- */
-void ioManagerWakeup (void);
-#if defined(THREADED_RTS)
-void ioManagerDie (void);
-void ioManagerStart (void);
-void ioManagerStartCap (/* inout */ Capability **cap);
-#endif
-
extern StgInt *signal_handlers;
#include "EndPrivate.h"
=====================================
rts/rts.cabal
=====================================
@@ -569,6 +569,7 @@ library
wasm/OSThreads.c
wasm/JSFFI.c
wasm/JSFFIGlobals.c
+ posix/FdWakeup.c
posix/Select.c
posix/Poll.c
posix/Timeout.c
@@ -581,6 +582,8 @@ library
posix/Ticker.c
posix/OSMem.c
posix/OSThreads.c
+ posix/FdWakeup.c
+ posix/MIO.c
posix/Poll.c
posix/Select.c
posix/Signals.c
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/0810b12fe3b2f9ef216752e76edfe83...
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/0810b12fe3b2f9ef216752e76edfe83...
You're receiving this email because of your account on gitlab.haskell.org.