Duncan Coutts pushed to branch wip/dcoutts/io-manager-tidy at Glasgow Haskell Compiler / GHC
Commits:
a23f0d0a by Duncan Coutts at 2026-06-19T08:09:20+01:00
Remove wakeupIOManager, ioManagerWakeup and setIOManagerWakeupFd
We no longer need wakeupIOManager for the threaded RTS case, so we can
remove it and the bits only needed to support it. This includes the
pipe/eventfd fd shared between the RTS and the in-library I/O manager
used for waking up the I/O manager thread. The pipe/eventfd still
exists, but it no longer has to be communicated to the RTS, since the
RTS no longer needs to use it.
So we remove the RTS API export setIOManagerWakeupFd, and remove uses of
it within the I/O managers in ghc-internal.
- - - - -
f4c88345 by Duncan Coutts at 2026-06-19T08:09:20+01:00
Add a new interruptIOManager API for the I/O managers
It will be used to interrupt awaitCompletedTimeoutsOrIO. Also update the
return type and docs for awaitCompletedTimeoutsOrIO to have it return
false when it gets interrupted, and have no useful post condition in
that case.
- - - - -
a0b052a9 by Duncan Coutts at 2026-06-19T08:09:20+01:00
Add interruptIOManager support for select I/O manager
Uses the FdWakup mechanism.
- - - - -
89b15fd6 by Duncan Coutts at 2026-06-19T08:09:20+01:00
Add interruptIOManager support for poll I/O manager
Uses the FdWakup mechanism.
A quirk we have to cope with is that we now need to poll one more fd --
the wakeup_fd_r -- but this fd has no corresponding entry in the
aiop_table. This is awkward since we have set up our aiop_poll_table to
be an auxilliary table with matching indicies.
The solution this patch uses (and described in the comments) is to have
two tables: struct pollfd *aiop_poll_table, *full_poll_table;
and to have the aiop_poll_table alias the tail of the full_poll_table.
The head entry in the full_poll_table is the extra fd. So we poll the
full_poll_table, while the aiop_poll_table still has matching indicies
with the aiop_table.
Hurrah for C aliasing rules.
- - - - -
9e503496 by Duncan Coutts at 2026-06-19T08:09:20+01:00
Add interruptIOManager support for win32 legacy I/O manager
And remove unused related helper resetAbandonRequestWait. It is not
called because the event is created in auto-reset mode, so never needs
to be reset manually.
- - - - -
8840ca5f by Duncan Coutts at 2026-06-19T08:09:20+01:00
Note lack of interruptIOManager support for WinIO I/O manager
Though there's a plausible design, we can't sanely test it at the moment
due to related WinIO bugs. Filed as issue #27403.
- - - - -
18 changed files:
- libraries/ghc-internal/src/GHC/Internal/Event/Control.hs
- libraries/ghc-internal/src/GHC/Internal/Event/Manager.hs
- libraries/ghc-internal/src/GHC/Internal/Event/TimerManager.hs
- rts/IOManager.c
- rts/IOManager.h
- rts/IOManagerInternals.h
- rts/RtsSymbols.c
- rts/include/rts/IOInterface.h
- rts/posix/MIO.c
- rts/posix/MIO.h
- rts/posix/Poll.c
- rts/posix/Poll.h
- rts/posix/Select.c
- rts/posix/Select.h
- rts/win32/AsyncMIO.c
- rts/win32/AsyncMIO.h
- rts/win32/AwaitEvent.c
- rts/win32/AwaitEvent.h
Changes:
=====================================
libraries/ghc-internal/src/GHC/Internal/Event/Control.hs
=====================================
@@ -39,7 +39,7 @@ import GHC.Internal.Show (Show)
import GHC.Internal.Types (Bool(..), Int, IO)
import GHC.Internal.Word (Word8)
import GHC.Internal.Foreign.C.Error (throwErrnoIfMinus1_, throwErrno, getErrno)
-import GHC.Internal.Foreign.C.Types (CInt(..), CSize(..))
+import GHC.Internal.Foreign.C.Types (CSize(..))
import GHC.Internal.Foreign.ForeignPtr (ForeignPtr, mallocForeignPtrBytes, withForeignPtr)
import GHC.Internal.Foreign.Marshal.Alloc (alloca, allocaBytes)
import GHC.Internal.Foreign.Marshal.Array (allocaArray)
@@ -51,7 +51,7 @@ import GHC.Internal.System.Posix.Types (Fd)
#if defined(HAVE_EVENTFD)
import GHC.Internal.Foreign.C.Error (throwErrnoIfMinus1, eBADF)
-import GHC.Internal.Foreign.C.Types (CULLong(..))
+import GHC.Internal.Foreign.C.Types (CInt(..), CULLong(..))
#else
import GHC.Internal.Foreign.C.Error (eAGAIN, eWOULDBLOCK, eBADF)
#endif
@@ -78,7 +78,10 @@ data Control = W {
, wakeupReadFd :: {-# UNPACK #-} !Fd
, wakeupWriteFd :: {-# UNPACK #-} !Fd
#endif
- , didRegisterWakeupFd :: !Bool
+ , didRegisterWakeupFd :: !Bool -- ^ Now redundant. Always False.
+ --TODO: remove ^^ this redundant field.
+ -- Technically, removing this is an API change to base. Sigh.
+
-- | Have this Control's fds been cleaned up?
, controlIsDead :: !(IORef Bool)
}
@@ -91,8 +94,8 @@ wakeupReadFd = controlEventFd
-- | Create the structure (usually a pipe) used for waking up the IO
-- manager thread from another thread.
-newControl :: Bool -> IO Control
-newControl shouldRegister = allocaArray 2 $ \fds -> do
+newControl :: IO Control
+newControl = allocaArray 2 $ \fds -> do
let createPipe = do
throwErrnoIfMinus1_ "pipe" $ c_pipe fds
rd <- peekElemOff fds 0
@@ -108,10 +111,8 @@ newControl shouldRegister = allocaArray 2 $ \fds -> do
ev <- throwErrnoIfMinus1 "eventfd" $ c_eventfd 0 0
setNonBlockingFD ev True
setCloseOnExec ev
- when shouldRegister $ c_setIOManagerWakeupFd ev
#else
(wake_rd, wake_wr) <- createPipe
- when shouldRegister $ c_setIOManagerWakeupFd wake_wr
#endif
isDead <- newIORef False
return W { controlReadFd = fromIntegral ctrl_rd
@@ -122,25 +123,16 @@ newControl shouldRegister = allocaArray 2 $ \fds -> do
, wakeupReadFd = fromIntegral wake_rd
, wakeupWriteFd = fromIntegral wake_wr
#endif
- , didRegisterWakeupFd = shouldRegister
+ , didRegisterWakeupFd = False
, controlIsDead = isDead
}
-- | Close the control structure used by the IO manager thread.
--- N.B. If this Control is the Control whose wakeup file was registered with
--- the RTS, then *BEFORE* the wakeup file is closed, we must call
--- c_setIOManagerWakeupFd (-1), so that the RTS does not try to use the wakeup
--- file after it has been closed.
---
--- Note, however, that even if we do the above, this function is still racy
--- since we do not synchronize between here and ioManagerWakeup.
--- ioManagerWakeup ignores failures that arise from this case.
closeControl :: Control -> IO ()
closeControl w = do
_ <- atomicSwapIORef (controlIsDead w) True
_ <- c_close . fromIntegral . controlReadFd $ w
_ <- c_close . fromIntegral . controlWriteFd $ w
- when (didRegisterWakeupFd w) $ c_setIOManagerWakeupFd (-1)
#if defined(HAVE_EVENTFD)
_ <- c_close . fromIntegral . controlEventFd $ w
#else
@@ -248,11 +240,3 @@ foreign import ccall unsafe "sys/eventfd.h eventfd"
foreign import ccall unsafe "sys/eventfd.h eventfd_write"
c_eventfd_write :: CInt -> CULLong -> IO CInt
#endif
-
-#if defined(wasm32_HOST_ARCH)
-c_setIOManagerWakeupFd :: CInt -> IO ()
-c_setIOManagerWakeupFd _ = return ()
-#else
-foreign import ccall unsafe "setIOManagerWakeupFd"
- c_setIOManagerWakeupFd :: CInt -> IO ()
-#endif
=====================================
libraries/ghc-internal/src/GHC/Internal/Event/Manager.hs
=====================================
@@ -194,7 +194,7 @@ newWith :: Backend -> IO EventManager
newWith be = do
iofds <- fmap (listArray (0, callbackArraySize-1)) $
replicateM callbackArraySize (newMVar =<< IT.new 8)
- ctrl <- newControl False
+ ctrl <- newControl
state <- newIORef Created
us <- newSource
_ <- mkWeakIORef state $ do
=====================================
libraries/ghc-internal/src/GHC/Internal/Event/TimerManager.hs
=====================================
@@ -126,7 +126,7 @@ new = newWith =<< newDefaultBackend
newWith :: Backend -> IO TimerManager
newWith be = do
timeouts <- newIORef Q.empty
- ctrl <- newControl True
+ ctrl <- newControl
state <- newIORef Created
us <- newSource
_ <- mkWeakIORef state $ do
=====================================
rts/IOManager.c
=====================================
@@ -343,9 +343,7 @@ void initCapabilityIOManager(CapIOManager *iomgr)
switch (iomgr_type) {
#if defined(IOMGR_ENABLED_SELECT)
case IO_MANAGER_SELECT:
- iomgr->blocked_queue_hd = END_TSO_QUEUE;
- iomgr->blocked_queue_tl = END_TSO_QUEUE;
- iomgr->sleeping_queue = END_TSO_QUEUE;
+ initCapabilityIOManagerSelect(iomgr);
break;
#endif
@@ -376,6 +374,12 @@ void initCapabilityIOManager(CapIOManager *iomgr)
void freeCapabilityIOManager(CapIOManager *iomgr)
{
switch (iomgr_type) {
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ freeCapabilityIOManagerSelect(iomgr);
+ break;
+#endif
+
#if defined(IOMGR_ENABLED_POLL)
case IO_MANAGER_POLL:
freeCapabilityIOManagerPoll(iomgr);
@@ -555,42 +559,6 @@ exitIOManager(bool wait_threads)
}
}
-/* Wakeup hook: called from the scheduler's wakeUpRts (currently only in
- * threaded mode).
- */
-void wakeupIOManager(void)
-{
- switch (iomgr_type) {
-
-#if defined(IOMGR_ENABLED_MIO_POSIX)
- case IO_MANAGER_MIO_POSIX:
- /* MIO Posix implementation in posix/Signals.c */
- ioManagerWakeup();
- break;
-#endif
-#if defined(IOMGR_ENABLED_MIO_WIN32)
- case IO_MANAGER_MIO_WIN32:
- /* MIO Windows implementation in win32/ThrIOManager.c
- * Yes, this is shared with the WinIO (threaded) impl.
- */
- ioManagerWakeup();
- break;
-#endif
-#if defined(IOMGR_ENABLED_WINIO)
- case IO_MANAGER_WINIO:
-#if defined(THREADED_RTS)
- /* WinIO threaded implementation in win32/ThrIOManager.c
- * Yes, this is shared with the MIO win32 impl.
- */
- ioManagerWakeup();
-#endif
- break;
-#endif
- default:
- break;
- }
-}
-
void markCapabilityIOManager(evac_fn evac, void *user, CapIOManager *iomgr)
{
switch (iomgr_type) {
@@ -764,19 +732,20 @@ void pollCompletedTimeoutsOrIO(CapIOManager *iomgr)
}
-void awaitCompletedTimeoutsOrIO(CapIOManager *iomgr)
+bool awaitCompletedTimeoutsOrIO(CapIOManager *iomgr)
{
debugTrace(DEBUG_iomanager, "waiting for completed IO or timeouts");
+ bool completed = true; // wait completed or interrupted?
switch (iomgr_type) {
#if defined(IOMGR_ENABLED_SELECT)
case IO_MANAGER_SELECT:
- awaitCompletedTimeoutsOrIOSelect(iomgr, true);
+ completed = awaitCompletedTimeoutsOrIOSelect(iomgr, true);
break;
#endif
#if defined(IOMGR_ENABLED_POLL)
case IO_MANAGER_POLL:
- awaitCompletedTimeoutsOrIOPoll(iomgr);
+ completed = awaitCompletedTimeoutsOrIOPoll(iomgr);
break;
#endif
@@ -788,13 +757,56 @@ void awaitCompletedTimeoutsOrIO(CapIOManager *iomgr)
#if defined(IOMGR_ENABLED_WINIO)
case IO_MANAGER_WINIO:
#endif
- awaitCompletedTimeoutsOrIOWin32(iomgr->cap, true);
+ completed = awaitCompletedTimeoutsOrIOWin32(iomgr->cap, true);
break;
#endif
default:
- barf("pollCompletedTimeoutsOrIO not implemented");
+ barf("awaitCompletedTimeoutsOrIO not implemented");
+ }
+ ASSERT(!emptyRunQueue(iomgr->cap) ||
+ getSchedState() != SCHED_RUNNING ||
+ !completed);
+ return completed;
+}
+
+
+/* Interrupt the I/O manager if it is blocked in awaitCompletedTimeoutsOrIO,
+ * causing it to return early and return false.
+ */
+void interruptIOManager(CapIOManager *iomgr)
+{
+ debugTrace(DEBUG_iomanager, "Interrupting the I/O manager...");
+ switch (iomgr_type) {
+
+#if defined(IOMGR_ENABLED_SELECT)
+ case IO_MANAGER_SELECT:
+ interruptIOManagerSelect(iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_POLL)
+ case IO_MANAGER_POLL:
+ interruptIOManagerPoll(iomgr);
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_WIN32_LEGACY)
+ case IO_MANAGER_WIN32_LEGACY:
+ abandonRequestWait();
+ break;
+#endif
+
+#if defined(IOMGR_ENABLED_WINIO)
+ case IO_MANAGER_WINIO:
+ /* FIXME: no support yet for interrupting in WinIO I/O manager
+ * See issue #27403
+ */
+ break;
+#endif
+
+ default:
+ break;
}
- ASSERT(!emptyRunQueue(iomgr->cap) || getSchedState() != SCHED_RUNNING);
}
=====================================
rts/IOManager.h
=====================================
@@ -306,23 +306,6 @@ void stopIOManager(void);
void exitIOManager(bool wait_threads);
-/* Wakeup hook: called from the scheduler's wakeUpRts (currently only in
- * threaded mode).
- *
- * The I/O manager can be blocked waiting on I/O or timers. Sometimes there are
- * other external events where we need to wake up the I/O manager and return
- * to the schedulr.
- *
- * At the moment, all the non-threaded I/O managers will do this automagically
- * since a signal will interrupt any waiting system calls, so at the moment
- * the implementation for the non-threaded I/O managers does nothing.
- *
- * For the I/O managers in threaded mode, this arranges to unblock the I/O
- * manager if it waa blocked waiting.
- */
-void wakeupIOManager(void);
-
-
/* GC hook: mark any per-capability GC roots the I/O manager uses.
*/
void markCapabilityIOManager(evac_fn evac, void *user, CapIOManager *iomgr);
@@ -382,20 +365,32 @@ bool anyPendingTimeoutsOrIO(CapIOManager *iomgr);
*/
void pollCompletedTimeoutsOrIO(CapIOManager *iomgr);
- /* If there are any completed I/O operations or expired timers, process the
+/* If there are any completed I/O operations or expired timers, process the
* completions as appropriate. If there are none, wait until I/O or a timer
* does complete (or we get a signal with a handler) and process the
* completions as appropriate.
*
- * Upon return this guarantees that the scheduler run queue is non-empty or
- * that the scheduler is no longer in the running state. Succinctly, the
- * post-condition is (!emptyRunQueue(cap) || getSchedState() != SCHED_RUNNING).
+ * Upon returning true this guarantees that the scheduler run queue is
+ * non-empty or that the scheduler is no longer in the running state.
+ * Succinctly, the post-condition in the return true case is
+ * (!emptyRunQueue(cap) || getSchedState() != SCHED_RUNNING).
+ * A false result means the wait was interrupted by interruptIOManager, and
+ * there is no post-condition in this case.
*
* This is only expected to be called if anyPendingTimeoutsOrIO() returns true,
* i.e. there actually is something to wait for.
*
* Called from schedule() both *before* and *after* scheduleDetectDeadlock().
*/
-void awaitCompletedTimeoutsOrIO(CapIOManager *iomgr);
+bool awaitCompletedTimeoutsOrIO(CapIOManager *iomgr);
+
+/* Interrupt the I/O manager if it is blocked in awaitCompletedTimeoutsOrIO,
+ * causing it to return early.
+ *
+ * Its use is inherently concurrent and racy: the interrupt races against any
+ * I/O or timer completion. This does not matter for the intended use case of
+ * returning control to the scheduler.
+ */
+void interruptIOManager(CapIOManager *iomgr);
#include "EndPrivate.h"
=====================================
rts/IOManagerInternals.h
=====================================
@@ -46,6 +46,11 @@ struct _CapIOManager {
StgTSO *sleeping_queue;
#endif
+#if defined(IOMGR_ENABLED_SELECT) || defined(IOMGR_ENABLED_POLL)
+ /* FDs for waking up the I/O manager when it is blocked waiting */
+ int interrupt_fd_r, interrupt_fd_w;
+#endif
+
#if defined(IOMGR_ENABLED_POLL)
/* AIOP and timeout collections shared by several I/O manager impls */
ClosureTable aiop_table;
@@ -53,8 +58,11 @@ struct _CapIOManager {
#endif
#if defined(IOMGR_ENABLED_POLL)
- /* Auxiliary table with size and indexes matching the aiop_table */
- struct pollfd *aiop_poll_table;
+ /* Auxiliary table with size and indexes matching the aiop_table. This is
+ * aliased to the tail of the full poll table, which has a head entry for
+ * the wakeup_fd_r above, so we can also poll that fd.
+ */
+ struct pollfd *aiop_poll_table, *full_poll_table;
#endif
#if defined(IOMGR_ENABLED_WIN32_LEGACY)
=====================================
rts/RtsSymbols.c
=====================================
@@ -265,7 +265,6 @@ extern char **environ;
#define RTS_USER_SIGNALS_SYMBOLS \
SymI_HasProto(setIOManagerControlFd) \
SymI_HasProto(setTimerManagerControlFd) \
- SymI_HasProto(setIOManagerWakeupFd) \
SymI_HasProto(blockUserSignals) \
SymI_HasProto(unblockUserSignals)
#else
=====================================
rts/include/rts/IOInterface.h
=====================================
@@ -33,7 +33,6 @@ void ioManagerFinished (void);
void setIOManagerControlFd (uint32_t cap_no, int fd);
void setTimerManagerControlFd(int fd);
-void setIOManagerWakeupFd (int fd);
#endif
=====================================
rts/posix/MIO.c
=====================================
@@ -30,27 +30,16 @@
#include
// Here's the pipe into which we will send our signals
-static int io_manager_wakeup_fd = -1;
static int timer_manager_control_wr_fd = -1;
// TODO: Eliminate these globals. Put then into the CapIOManager, but the
// problem is these are shared across all caps, not per cap.
-#define IO_MANAGER_WAKEUP 0xff
#define IO_MANAGER_DIE 0xfe
-#define IO_MANAGER_SYNC 0xfd
void setTimerManagerControlFd(int fd) {
RELAXED_STORE(&timer_manager_control_wr_fd, fd);
}
-void
-setIOManagerWakeupFd (int fd)
-{
- // only called when THREADED_RTS, but unconditionally
- // compiled here because GHC.Event.Control depends on it.
- SEQ_CST_STORE(&io_manager_wakeup_fd, fd);
-}
-
#if defined(THREADED_RTS)
void timerManagerNotifySignal(int sig, siginfo_t *info)
{
@@ -81,40 +70,6 @@ void timerManagerNotifySignal(int sig, siginfo_t *info)
#endif
-/* -----------------------------------------------------------------------------
- * Wake up at least one IO or timer manager HS thread.
- * -------------------------------------------------------------------------- */
-void
-ioManagerWakeup (void)
-{
- int r;
- const int wakeup_fd = SEQ_CST_LOAD(&io_manager_wakeup_fd);
- // Wake up the IO Manager thread by sending a byte down its pipe
- if (wakeup_fd >= 0) {
-#if defined(HAVE_EVENTFD)
- StgWord64 n = (StgWord64)IO_MANAGER_WAKEUP;
- r = write(wakeup_fd, (char *) &n, 8);
-#else
- StgWord8 byte = (StgWord8)IO_MANAGER_WAKEUP;
- r = write(wakeup_fd, &byte, 1);
-#endif
- /* N.B. If the TimerManager is shutting down as we run this
- * then there is a possibility that our first read of
- * io_manager_wakeup_fd is non-negative, but before we get to the
- * write the file is closed. If this occurs, io_manager_wakeup_fd
- * will be written into with -1 (GHC.Event.Control does this prior
- * to closing), so checking this allows us to distinguish this case.
- * To ensure we observe the correct ordering, we declare the
- * io_manager_wakeup_fd as volatile.
- * Since this is not an error condition, we do not print the error
- * message in this case.
- */
- if (r == -1 && SEQ_CST_LOAD(&io_manager_wakeup_fd) >= 0) {
- sysErrorBelch("ioManagerWakeup: write");
- }
- }
-}
-
#if defined(THREADED_RTS)
void
ioManagerDie (void)
@@ -157,7 +112,7 @@ ioManagerStart (void)
{
// Make sure the IO manager thread is running
Capability *cap;
- if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0 || SEQ_CST_LOAD(&io_manager_wakeup_fd) < 0) {
+ if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0) {
cap = rts_lock();
ioManagerStartCap(&cap);
rts_unlock(cap);
=====================================
rts/posix/MIO.h
=====================================
@@ -18,7 +18,6 @@
/* Communicating with the IO manager thread (see GHC.Conc).
*/
-void ioManagerWakeup (void);
#if defined(THREADED_RTS)
void ioManagerDie (void);
void ioManagerStart (void);
=====================================
rts/posix/Poll.c
=====================================
@@ -41,6 +41,7 @@
#include "IOManagerInternals.h"
#include "Timeout.h"
+#include "FdWakeup.h"
/******************************************************************************
@@ -107,8 +108,9 @@ timeout (if any) as the poll() timeout parameter.
The CapIOManager structure for this I/O manager contains:
ClosureTable aiop_table;
- struct pollfd *aiop_poll_table;
+ struct pollfd *aiop_poll_table, *full_poll_table;
StgTimeoutQueue *timeout_queue;
+ int interrupt_fd_r, interrupt_fd_w;
We also support the Linux-specific ppoll API which supports higher resolution
time delays -- nanoseconds rather than milliseconds as in classic poll(). It
@@ -117,6 +119,15 @@ also allows the signal mask to be adjusted, but we do not make use of this.
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask);
+We have both aiop_poll_table and full_poll_table. This is to cope with needing
+to wait on the special extra file descriptor interrupt_fd_r. This fd is used to
+support waking the I/O manager when we are blocked in a poll call. This
+requires waiting on an extra fd that has no corresponding entry in the
+aiop_table. To manage this quirk, we alias the aiop_poll_table to be the tail
+of the full_poll_table and have the first entry of the full_poll_table be the
+interrupt_fd_r. This means the aiop_poll_table indicies match up exactly with
+the aiop_table, but still allows the full_poll_table to have an extra entry.
+
******************************************************************************/
/* Forward declarations */
@@ -129,16 +140,25 @@ static void reportPollError(int res, nfds_t nfds) STG_NORETURN;
void initCapabilityIOManagerPoll(CapIOManager *iomgr)
{
initClosureTable(&iomgr->aiop_table, ClosureTableCompact);
- iomgr->aiop_poll_table = NULL;
iomgr->timeout_queue = emptyTimeoutQueue();
+
+ newFdWakeup(&iomgr->interrupt_fd_r, &iomgr->interrupt_fd_w);
+
+ iomgr->full_poll_table = stgMallocBytes(sizeof(struct pollfd) /* size 1 */,
+ "initCapabilityIOManagerPoll");
+ iomgr->full_poll_table[0] = (struct pollfd) {
+ .fd = iomgr->interrupt_fd_r,
+ .events = POLLIN,
+ .revents = 0
+ };
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1; /* hence empty */
}
void freeCapabilityIOManagerPoll(CapIOManager *iomgr)
{
- if (iomgr->aiop_poll_table) {
- stgFree(iomgr->aiop_poll_table);
- }
+ stgFree(iomgr->full_poll_table);
+ closeFdWakeup(iomgr->interrupt_fd_r, iomgr->interrupt_fd_w);
}
@@ -283,7 +303,7 @@ static void notifyIOCompletion(CapIOManager *iomgr, StgAsyncIOOp *aiop)
}
-static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
+static bool processIOCompletions(CapIOManager *iomgr, int ncompletions)
{
/* The scheme we use with poll is that we have a dense poll table, and a
* corresponding table that maps to the closure table index. The poll
@@ -293,6 +313,19 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
*/
debugTrace(DEBUG_iomanager, "processIOCompletions(ncompletions = %d)",
ncompletions);
+
+ bool interrupt;
+ /* If the interrupt_fd_r is ready, collect it */
+ if (iomgr->full_poll_table[0].revents) {
+ ASSERT(iomgr->full_poll_table[0].fd == iomgr->interrupt_fd_r);
+ collectFdWakeup(iomgr->interrupt_fd_r);
+ ncompletions--;
+ interrupt = true;
+ debugTrace(DEBUG_iomanager, "Received interrupt in poll I/O manager");
+ } else {
+ interrupt = false;
+ }
+
struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
int n = ncompletions;
int i = 0;
@@ -345,11 +378,14 @@ static void processIOCompletions(CapIOManager *iomgr, int ncompletions)
i++;
}
}
+ return interrupt;
}
void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
if (!isEmptyTimeoutQueue(iomgr->timeout_queue)) {
Time now = getProcessElapsedTime();
processTimeoutCompletions(iomgr, now);
@@ -357,20 +393,20 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
if (!isEmptyClosureTable(&iomgr->aiop_table)) {
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
/* Poll for I/O readiness, without waiting. */
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
/* We could use poll here, since we use no timeout, but for
consistency we use the same syscall as at the other call site. */
struct timespec tv = (struct timespec) { .tv_sec = 0, .tv_nsec = 0 };
- int res = ppoll(iomgr->aiop_poll_table, nfds, &tv, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, &tv, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = 0, timeout.nsec = 0) = %d",
nfds, res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, 0);
+ int res = poll(iomgr->full_poll_table, nfds, 0);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = 0) = %d",
@@ -396,8 +432,12 @@ void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
}
-void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
+bool awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
{
+ bool interrupt = false; /* got woken up via interruptIOManager */
+
+ ASSERT(iomgr->aiop_poll_table == iomgr->full_poll_table+1);
+
/* Loop until we've woken up some threads. This loop is needed because the
* poll() timing isn't accurate, we sometimes sleep for a while but not
* long enough to wake up a thread in a threadDelay. Or we may need to
@@ -430,9 +470,9 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
#endif
/* Check for I/O readiness, possibly waiting. */
- nfds_t nfds = sizeClosureTable(&iomgr->aiop_table);
+ nfds_t nfds = sizeClosureTable(&iomgr->aiop_table) + 1;
#if defined(HAVE_DECL_PPOLL) && HAVE_DECL_PPOLL == 1
- int res = ppoll(iomgr->aiop_poll_table, nfds, timeout_ns, NULL);
+ int res = ppoll(iomgr->full_poll_table, nfds, timeout_ns, NULL);
debugTrace(DEBUG_iomanager,
"ppoll(nfds = %d, timeout.sec = %d, timeout.nsec = %d) = %d",
@@ -440,7 +480,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
timeout_ns == NULL ? 0 : timeout_ns->tv_nsec,
res);
#else
- int res = poll(iomgr->aiop_poll_table, nfds, timeout_ms);
+ int res = poll(iomgr->full_poll_table, nfds, timeout_ms);
debugTrace(DEBUG_iomanager,
"poll(nfds = %d, timeout_ms = %d) = %d",
@@ -462,7 +502,7 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
} else if (res > 0) {
int ncompletions = res;
ASSERT(ncompletions <= (int)nfds);
- processIOCompletions(iomgr, ncompletions);
+ interrupt = processIOCompletions(iomgr, ncompletions);
// FIXME: do we also need to check for timeout completions now?
// we have a non-empty queue, but if !wait then we have also moved
// on and so we sould check for timeouts.
@@ -490,7 +530,9 @@ void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr)
}
} while (emptyRunQueue(iomgr->cap)
+ && !interrupt
&& (getSchedState() == SCHED_RUNNING));
+ return !interrupt;
}
static void reportPollError(int res, nfds_t nfds)
@@ -509,6 +551,12 @@ static void reportPollError(int res, nfds_t nfds)
}
+void interruptIOManagerPoll(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->interrupt_fd_w);
+}
+
+
/* Helper function to double the size of the aiop_table and aiop_poll_table.
*/
static bool enlargeTables(CapIOManager *iomgr)
@@ -519,13 +567,17 @@ static bool enlargeTables(CapIOManager *iomgr)
bool ok = enlargeClosureTable(iomgr->cap, &iomgr->aiop_table, newcapacity);
if (RTS_UNLIKELY(!ok)) return false;
- /* Update the auxiliary aiop_poll_table to match */
- struct pollfd *aiop_poll_table;
- aiop_poll_table = stgReallocBytes(iomgr->aiop_poll_table,
- sizeof(struct pollfd) * newcapacity,
- "Poll.c: enlargeTables");
- iomgr->aiop_poll_table = aiop_poll_table;
+ /* Update the auxiliary aiop_poll_table to match. The full_poll_table is
+ * one bigger than the aiop_poll_table, since it has an extra entry at the
+ * front for interrupt_fd_r, with no corresponding aiop. */
+ iomgr->full_poll_table =
+ stgReallocBytes(iomgr->full_poll_table,
+ sizeof(struct pollfd) * (newcapacity+1),
+ "Poll.c: enlargeTables");
+ iomgr->aiop_poll_table = iomgr->full_poll_table+1;
+
/* Initialise the new part of the aiop_poll_table */
+ struct pollfd *aiop_poll_table = iomgr->aiop_poll_table;
for (int i = oldcapacity; i < newcapacity; i++) {
aiop_poll_table[i] = (struct pollfd) {
.fd = -1,
=====================================
rts/posix/Poll.h
=====================================
@@ -32,7 +32,8 @@ void asyncIOCancelPoll(CapIOManager *iomgr, StgAsyncIOOp *aiop);
/* Scheduler operations */
bool anyPendingTimeoutsOrIOPoll(CapIOManager *iomgr);
void pollCompletedTimeoutsOrIOPoll(CapIOManager *iomgr);
-void awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr);
+bool awaitCompletedTimeoutsOrIOPoll(CapIOManager *iomgr);
+void interruptIOManagerPoll(CapIOManager *iomgr);
#endif /* IOMGR_ENABLED_POLL */
=====================================
rts/posix/Select.c
=====================================
@@ -22,6 +22,7 @@
#include "IOManagerInternals.h"
#include "Stats.h"
#include "GetTime.h"
+#include "FdWakeup.h"
# if defined(HAVE_SYS_SELECT_H)
# include
@@ -54,6 +55,25 @@
#define TimeToLowResTimeRoundUp(t) (t)
#endif
+void initCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ iomgr->blocked_queue_hd = END_TSO_QUEUE;
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->sleeping_queue = END_TSO_QUEUE;
+
+ newFdWakeup(&iomgr->interrupt_fd_r, &iomgr->interrupt_fd_w);
+}
+
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr)
+{
+ closeFdWakeup(iomgr->interrupt_fd_r, iomgr->interrupt_fd_w);
+}
+
+void interruptIOManagerSelect(CapIOManager *iomgr)
+{
+ sendFdWakeup(iomgr->interrupt_fd_w);
+}
+
/*
* Return the time since the program started, in LowResTime,
* rounded down.
@@ -215,7 +235,7 @@ static enum FdState fdPollWriteState (int fd)
* not write handles.
*
*/
-void
+bool
awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
{
StgTSO *tso, *prev, *next;
@@ -225,6 +245,7 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
bool seen_bad_fd = false;
struct timeval tv, *ptv;
LowResTime now;
+ bool interrupt = false; /* got interrupted up via interruptIOManager */
IF_DEBUG(scheduler,
debugBelch("scheduler: checking for threads blocked on I/O");
@@ -243,7 +264,7 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
now = getLowResTimeOfDay();
if (wakeUpSleepingThreads(iomgr, now)) {
- return;
+ return true;
}
/*
@@ -252,6 +273,13 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
FD_ZERO(&rfd);
FD_ZERO(&wfd);
+ /* We're always interested in our interrupt fd */
+ {
+ int fd = iomgr->interrupt_fd_r;
+ maxfd = (fd > maxfd) ? fd : maxfd;
+ FD_SET(fd, &rfd);
+ }
+
for(tso = iomgr->blocked_queue_hd;
tso != END_TSO_QUEUE;
tso = next) {
@@ -354,14 +382,14 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
#if defined(RTS_USER_SIGNALS)
if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
startSignalHandlers(iomgr->cap);
- return; /* still hold the lock */
+ return true; /* still hold the lock */
}
#endif
/* we were interrupted, return to the scheduler immediately.
*/
if (getSchedState() >= SCHED_INTERRUPTING) {
- return; /* still hold the lock */
+ return true; /* still hold the lock */
}
/* check for threads that need waking up
@@ -372,10 +400,17 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
* I/O and run them.
*/
if (!emptyRunQueue(iomgr->cap)) {
- return; /* still hold the lock */
+ return true; /* still hold the lock */
}
}
+ /* If the interrupt_fd_r is ready, collect it */
+ if (FD_ISSET(iomgr->interrupt_fd_r, &rfd)) {
+ collectFdWakeup(iomgr->interrupt_fd_r);
+ interrupt = true;
+ debugTrace(DEBUG_iomanager, "Received interrupt in select I/O manager");
+ }
+
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
@@ -458,7 +493,9 @@ awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait)
}
} while (wait && getSchedState() == SCHED_RUNNING
- && emptyRunQueue(iomgr->cap));
+ && emptyRunQueue(iomgr->cap)
+ && !interrupt);
+ return !interrupt;
}
#endif /* IOMGR_ENABLED_SELECT */
=====================================
rts/posix/Select.h
=====================================
@@ -15,7 +15,12 @@ typedef StgWord LowResTime;
LowResTime getDelayTarget (HsInt us);
-void awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait);
+void initCapabilityIOManagerSelect(CapIOManager *iomgr);
+void freeCapabilityIOManagerSelect(CapIOManager *iomgr);
+void wakeupIOManagerSelect(CapIOManager *iomgr);
+
+bool awaitCompletedTimeoutsOrIOSelect(CapIOManager *iomgr, bool wait);
+void interruptIOManagerSelect(CapIOManager *iomgr);
#include "EndPrivate.h"
=====================================
rts/win32/AsyncMIO.c
=====================================
@@ -221,8 +221,12 @@ shutdownAsyncIO(bool wait_threads)
* requests to make further progress. In the latter scenario,
* awaitRequests() will simply block waiting for worker threads
* to complete if the 'completedTable' is empty.
+ *
+ * The result reports if the wait completed successfully (typically with some
+ * work available), or was interrupted by abandonRequestWait(), with true
+ * meaning completed, and false meaning interrupted.
*/
-int
+bool
awaitRequests(bool wait)
{
#if !defined(THREADED_RTS)
@@ -246,7 +250,7 @@ start:
#endif
) {
OS_RELEASE_LOCK(&queue_lock);
- return 0;
+ return true;
}
if (completed_hw == 0) {
// empty table, drop lock and wait
@@ -259,22 +263,24 @@ start:
// a request was completed
break;
case WAIT_OBJECT_0 + 1:
+ // abandon_req_wait signaled, by abandonRequestWait()
+ return false;
case WAIT_TIMEOUT:
// timeout (unlikely) or told to abandon waiting
- return 0;
+ return true;
case WAIT_FAILED: {
DWORD dw = GetLastError();
fprintf(stderr, "awaitRequests: wait failed -- "
"error code: %lu\n", dw); fflush(stderr);
- return 0;
+ return true;
}
default:
fprintf(stderr, "awaitRequests: unexpected wait return "
"code %lu\n", dwRes); fflush(stderr);
- return 0;
+ return true;
}
} else {
- return 0;
+ return true;
}
goto start;
} else {
@@ -352,7 +358,7 @@ start:
completed_hw = 0;
ResetEvent(completed_req_event);
OS_RELEASE_LOCK(&queue_lock);
- return 1;
+ return true;
}
#endif /* !THREADED_RTS */
}
@@ -383,12 +389,6 @@ abandonRequestWait( void )
interruptIOManagerEvent ();
}
-void
-resetAbandonRequestWait( void )
-{
- ResetEvent(abandon_req_wait);
-}
-
#endif /* !defined(THREADED_RTS) */
HsInt rts_EINTR(void)
=====================================
rts/win32/AsyncMIO.h
=====================================
@@ -25,7 +25,7 @@ extern unsigned int addDoProcRequest(void* proc, void* param);
extern int startupAsyncIO(void);
extern void shutdownAsyncIO(bool wait_threads);
-extern int awaitRequests(bool wait);
+extern bool awaitRequests(bool wait);
extern void abandonRequestWait(void);
extern void resetAbandonRequestWait(void);
=====================================
rts/win32/AwaitEvent.c
=====================================
@@ -28,17 +28,21 @@
// Protected by sched_mutex.
static bool workerWaitingForRequests = false;
-void
+bool
awaitCompletedTimeoutsOrIOWin32(Capability *cap, bool wait)
{
+ bool interrupt = false;
do {
/* Try to de-queue completed IO requests
*/
workerWaitingForRequests = true;
if (is_io_mng_native_p())
awaitAsyncRequests(wait);
+ /* FIXME: no support yet for interrupting in WinIO I/O manager
+ * See issue #27403
+ */
else
- awaitRequests(wait);
+ interrupt = !awaitRequests(wait);
workerWaitingForRequests = false;
// If a signal was raised, we need to service it
@@ -47,11 +51,12 @@ awaitCompletedTimeoutsOrIOWin32(Capability *cap, bool wait)
// does it and I'm feeling too paranoid to refactor it today --SDM
if (stg_pending_events != 0) {
startSignalHandlers(cap);
- return;
+ // This will normally cause emptyRunQueue to become false and
+ // thus we will drop out of the loop.
}
- // The return value from awaitRequests() is a red herring: ignore
- // it. Return to the scheduler if !wait, or
+ // The return value from awaitRequests() reports if it was interrupted by
+ // abandonRequestWait(). Return to the scheduler if !wait, or
//
// - we were interrupted
// - the run-queue is now non- empty
@@ -59,6 +64,8 @@ awaitCompletedTimeoutsOrIOWin32(Capability *cap, bool wait)
} while (wait
&& getSchedState() == SCHED_RUNNING
&& emptyRunQueue(cap)
+ && !interrupt
);
+ return !interrupt;
}
#endif
=====================================
rts/win32/AwaitEvent.h
=====================================
@@ -2,6 +2,6 @@
#include "BeginPrivate.h"
-void awaitCompletedTimeoutsOrIOWin32(Capability *cap, bool wait);
+bool awaitCompletedTimeoutsOrIOWin32(Capability *cap, bool wait);
#include "EndPrivate.h"
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/b3c5699faf19dfa81d8f98170048eb8...
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/b3c5699faf19dfa81d8f98170048eb8...
You're receiving this email because of your account on gitlab.haskell.org.