[Git][ghc/ghc][wip/27113-retry] rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Zubin pushed to branch wip/27113-retry at Glasgow Haskell Compiler / GHC
Commits:
dc4d1c48 by Zubin Duggal at 2026-05-19T18:21:00+05:30
rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Closes a race in the threaded RTS where a SIGPIPE fired by throwTo to
cancel an interruptible foreign call may land in the user-space prologue
of the FFI wrapper, between suspendThread releasing the capability and
the kernel syscall starting. GHC's empty SIGPIPE handler swallows the
signal and the syscall then blocks forever.
The original retry implementation walked cap->suspended_ccalls from the
ticker and read TSO heap fields (why_blocked, blocked_exceptions, flags)
to decide whether to re-fire interruptOSThread. Review (review.md)
identified two real bugs in that design:
H1. TSO reads from outside cap ownership race with moving GC's
evacuation of suspended_ccalls TSOs and with descheduling between
the load of incall->suspended_tso and the subsequent field reads.
Spurious-driven SIGPIPE EINTRs unrelated unsafe syscalls on the
same OS thread once the worker has moved on.
H2. blocked_exceptions \!= END is not a live-throw predicate. A
MessageThrowTo overwritten with MSG_NULL by doneWithMsgThrowTo
persists on the queue under mask_, causing an EINTR storm in
throwErrnoIfMinus1Retry-style wrappers.
This commit reworks the fix to address both:
* A per-InCall atomic flag `interrupt_pending` is set by throwToMsg
after it fires interruptOSThread and cleared by resumeThread (before
recoverSuspendedTask, so the ticker cannot observe a stale set flag
after the InCall has been unlinked).
* throwToMsg's BlockedOnCCall_Interruptible search is extended to
walk every capability's suspended_ccalls (under cap->lock), so the
flag is set even when the target migrated to a different cap from
the throwing thread.
* doneWithMsgThrowTo's revoke path (removeFromQueues') re-scans the
target's blocked_exceptions for any non-MSG_NULL entries; if none
remain, clears the InCall's interrupt_pending so the retry stops.
Fixes H2.
* The ticker now reads only InCall fields (interrupt_pending,
task->id) — never TSO heap state — and fires interruptOSThread
while interrupt_pending is set. Fixes H1.
* The retry hook in handle_tick is gated on timer_disabled, matching
the context-switch path (review M1).
* T27113_c.c switched to int64_t to avoid 32-bit `long` overflow on
32-bit POSIX (review L1).
* Adds T27113c, a regression test for H2: a MessageThrowTo against
an interruptible-FFI target is revoked (via killing the throwing
thread), and the target's masked EINTR-retry loop is checked for
forward progress (no EINTR storm).
Build: T27113 and T27113c pass on ghci/threaded1/threaded2 ways; no
regressions in conc014/015/015a/016/017/017a/058/foreignInterruptible
on threaded2.
- - - - -
11 changed files:
- rts/RaiseAsync.c
- rts/Schedule.c
- rts/Schedule.h
- rts/Task.c
- rts/Task.h
- rts/Timer.c
- testsuite/tests/concurrent/should_run/T27113.hs
- testsuite/tests/concurrent/should_run/T27113_c.c
- + testsuite/tests/concurrent/should_run/T27113b.hs
- + testsuite/tests/concurrent/should_run/T27113c.hs
- testsuite/tests/concurrent/should_run/all.T
Changes:
=====================================
rts/RaiseAsync.c
=====================================
@@ -36,6 +36,11 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS,
Capability *target_cap USED_IF_THREADS,
MessageThrowTo *msg USED_IF_THREADS);
+#if defined(THREADED_RTS)
+static void clear_interrupt_pending_if_no_live_throw (Capability *cap,
+ StgTSO *target);
+#endif
+
/* -----------------------------------------------------------------------------
throwToSingleThreaded
@@ -338,7 +343,14 @@ check_target:
}
// nobody else can wake up this TSO after we claim the message
+#if defined(THREADED_RTS)
+ // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
+ StgTSO *m_target = m->target;
+#endif
doneWithMsgThrowTo(cap, m);
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
raiseAsync(cap, target, msg->exception, false, NULL);
return THROWTO_SUCCESS;
@@ -433,11 +445,12 @@ check_target:
#if defined(THREADED_RTS)
{
Task *task = NULL;
- // walk suspended_ccalls to find the correct worker thread
+ InCall *target_incall = NULL;
InCall *incall;
for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
if (incall->suspended_tso == target) {
task = incall->task;
+ target_incall = incall;
break;
}
}
@@ -446,6 +459,8 @@ check_target:
if (!((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0))) {
interruptWorkerTask(task);
+ // #27113: ticker re-fires interruptOSThread while this is set.
+ RELEASE_STORE(&target_incall->interrupt_pending, 1);
}
return THROWTO_BLOCKED;
} else {
@@ -511,6 +526,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
// Block a throwTo message on the target TSO's blocked_exceptions
// queue. The current Capability must own the target TSO in order to
// modify the blocked_exceptions queue.
+//
+// #27113: cap->lock serialises the queue mutation with the cross-cap revoke
+// walk in clear_interrupt_pending_if_no_live_throw.
void
blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
@@ -520,8 +538,14 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
+#if defined(THREADED_RTS)
+ ACQUIRE_LOCK(&cap->lock);
+#endif
msg->link = target->blocked_exceptions;
- target->blocked_exceptions = msg;
+ RELEASE_STORE(&target->blocked_exceptions, msg);
+#if defined(THREADED_RTS)
+ RELEASE_LOCK(&cap->lock);
+#endif
}
/* -----------------------------------------------------------------------------
@@ -581,6 +605,8 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
throwToSingleThreaded(cap, msg->target, msg->exception);
source = msg->source;
+ // #27113: msg->target == tso; its own resumeThread already cleared
+ // (or is about to clear) interrupt_pending, so no helper call here.
doneWithMsgThrowTo(cap, msg);
tryWakeupThread(cap, source);
return 1;
@@ -603,6 +629,7 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
i = lockClosure((StgClosure *)msg);
if (i != &stg_MSG_NULL_info) {
source = msg->source;
+ // #27113: msg->target == tso; its own resumeThread clears the flag.
doneWithMsgThrowTo(cap, msg);
tryWakeupThread(cap, source);
} else {
@@ -664,6 +691,54 @@ removeFromMVarBlockedQueue (StgTSO *tso)
tso->_link = END_TSO_QUEUE;
}
+#if defined(THREADED_RTS)
+// #27113: clear an interruptible-FFI target's interrupt_pending flag once
+// a revoke leaves no live (non-MSG_NULL) entry on its blocked_exceptions
+// queue. target->cap is stable while target is suspended in an interruptible
+// FFI call (no migration off the run queue), so the InCall lives on
+// target_cap->suspended_ccalls. target_cap->lock is taken unconditionally:
+// it serialises us with blockedThrowTo, suspendThread's re-arm, and
+// resumeThread's unlink — and also with the ticker's TRY_ACQUIRE_LOCK, so
+// the clear is not raced by a ticker reading the pre-clear flag.
+static void
+clear_interrupt_pending_if_no_live_throw (Capability *cap, StgTSO *target)
+{
+ if (target == NULL) return;
+ if (ACQUIRE_LOAD(&target->why_blocked) != BlockedOnCCall_Interruptible)
+ return;
+
+ Capability *target_cap = ACQUIRE_LOAD(&target->cap);
+ ACQUIRE_LOCK(&target_cap->lock);
+
+ // Find the InCall on target_cap. If target has already resumed (or
+ // wasn't really suspended on this cap), do nothing.
+ InCall *found = NULL;
+ for (InCall *ic = target_cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ic->suspended_tso == target) { found = ic; break; }
+ }
+ if (found == NULL) {
+ RELEASE_LOCK(&target_cap->lock);
+ return;
+ }
+
+ bool live = false;
+ for (MessageThrowTo *t = ACQUIRE_LOAD(&target->blocked_exceptions);
+ t != END_BLOCKED_EXCEPTIONS_QUEUE;
+ t = (MessageThrowTo*)t->link) {
+ if (RELAXED_LOAD((StgWord*)&t->header.info)
+ != (StgWord)&stg_MSG_NULL_info) {
+ live = true; break;
+ }
+ }
+ if (!live) {
+ RELEASE_STORE(&found->interrupt_pending, 0);
+ }
+
+ RELEASE_LOCK(&target_cap->lock);
+ (void)cap;
+}
+#endif
+
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
@@ -699,8 +774,17 @@ removeFromQueues(Capability *cap, StgTSO *tso)
// capabilities.
// ASSERT(m->header.info == &stg_WHITEHOLE_info);
+#if defined(THREADED_RTS)
+ // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
+ StgTSO *m_target = m->target;
+#endif
+
// unlock and revoke it at the same time
doneWithMsgThrowTo(cap, m);
+
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
break;
}
=====================================
rts/Schedule.c
=====================================
@@ -2536,6 +2536,30 @@ suspendThread (StgRegTable *reg, bool interruptible)
suspendTask(cap,task);
cap->in_haskell = false;
+
+#if defined(THREADED_RTS)
+ // #27113: re-arm interrupt_pending if entering an interruptible call with
+ // a live throw already queued. Covers under-mask EINTR-retry: throwToMsg
+ // set the flag, resumeThread cleared it, and we're now re-entering pause.
+ // Under cap->lock, after suspendTask, so the scan and store are serialised
+ // with the revoke walk.
+ if (interruptible
+ && !((tso->flags & TSO_BLOCKEX) &&
+ ((tso->flags & TSO_INTERRUPTIBLE) == 0))) {
+ MessageThrowTo *head = ACQUIRE_LOAD(&tso->blocked_exceptions);
+ bool live = false;
+ for (MessageThrowTo *m = head;
+ m != END_BLOCKED_EXCEPTIONS_QUEUE;
+ m = (MessageThrowTo*)m->link) {
+ if (RELAXED_LOAD((StgWord*)&m->header.info)
+ != (StgWord)&stg_MSG_NULL_info) { live = true; break; }
+ }
+ if (live) {
+ RELEASE_STORE(&task->incall->interrupt_pending, 1);
+ }
+ }
+#endif
+
releaseCapability_(cap,false);
RELEASE_LOCK(&cap->lock);
@@ -2574,8 +2598,14 @@ resumeThread (void *task_)
// entry on the suspended_ccalls list will also have been
// migrated.
- // Remove the thread from the suspended list
+ // #27113: clear interrupt_pending and unlink under cap->lock so the
+ // ticker's TRY_ACQUIRE_LOCK serialises with us. Without the lock, the
+ // ticker can read a stale flag and SIGPIPE the worker after it has
+ // already returned from the FFI call onto an unrelated syscall.
+ ACQUIRE_LOCK(&cap->lock);
+ RELEASE_STORE(&incall->interrupt_pending, 0);
recoverSuspendedTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
tso = incall->suspended_tso;
incall->suspended_tso = NULL;
@@ -2614,6 +2644,49 @@ resumeThread (void *task_)
return &cap->r;
}
+#if defined(THREADED_RTS)
+// #27113: re-fire interruptOSThread for InCalls whose interrupt_pending is
+// still set. Reads only InCall fields, never TSO heap state, so no GC race.
+//
+// cap->lock is held across the OS interrupt call. This pairs with the
+// same lock taken by resumeThread around its clear+unlink: when we fire
+// interruptOSThread, resumeThread is either blocked behind us on the lock
+// (signal lands harmlessly on a thread inside pthread sync) or has not yet
+// run (signal lands inside the FFI call where it belongs). Releasing the
+// lock before signalling would let resumeThread complete and re-enter
+// Haskell first, so SIGPIPE / CancelSynchronousIo would hit an unrelated
+// later syscall.
+//
+// The per-OS-call cost is small (~µs for pthread_kill on POSIX, ~tens of
+// µs for OpenThread+CancelSynchronousIo+CloseHandle on Windows), but the
+// per-tick lock-hold time scales linearly with the number of pending
+// interruptible calls on this cap. For typical workloads this is well
+// below the tick interval; FFI-heavy workloads with many concurrent
+// interruptible calls per cap may want empirical measurement.
+void
+retryInterruptibleSignals(void)
+{
+ bool any_pending = false;
+ uint32_t n = getNumCapabilities();
+ for (uint32_t i = 0; i < n; i++) {
+ Capability *cap = getCapability(i);
+ if (RELAXED_LOAD(&cap->n_suspended_ccalls) == 0) continue;
+ if (TRY_ACQUIRE_LOCK(&cap->lock) != 0) continue;
+ for (InCall *ic = cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ACQUIRE_LOAD(&ic->interrupt_pending) == 0) continue;
+ // ic->task is set by newInCall and never cleared.
+ if (ic->task != NULL) {
+ any_pending = true;
+ interruptOSThread(ic->task->id);
+ }
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+ // Keep the timer alive; handle_tick's idle path would otherwise stop it.
+ if (any_pending) setRecentActivity(ACTIVITY_YES);
+}
+#endif
+
/* ---------------------------------------------------------------------------
* scheduleThread()
*
=====================================
rts/Schedule.h
=====================================
@@ -43,6 +43,10 @@ void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
void wakeUpRts(void);
#endif
+#if defined(THREADED_RTS)
+void retryInterruptibleSignals(void);
+#endif
+
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
=====================================
rts/Task.c
=====================================
@@ -267,6 +267,7 @@ newInCall (Task *task)
incall->next = NULL;
incall->prev = NULL;
incall->prev_stack = task->incall;
+ incall->interrupt_pending = 0;
task->incall = incall;
}
=====================================
rts/Task.h
=====================================
@@ -106,6 +106,15 @@ typedef struct InCall_ {
// Links InCalls onto suspended_ccalls, spare_incalls
struct InCall_ *prev;
struct InCall_ *next;
+
+ // #27113. Set (atomically) by throwToMsg once it has fired
+ // interruptOSThread for this call's worker; cleared by resumeThread
+ // (when the call returns) and by the revoke paths in
+ // doneWithMsgThrowTo (when all queued throws against the target are
+ // MSG_NULL). The ticker reads this and re-fires interruptOSThread
+ // while it is set. Not GC-managed — lives in malloc'd memory with
+ // the Task that owns this InCall.
+ StgWord interrupt_pending;
} InCall;
typedef struct Task_ {
=====================================
rts/Timer.c
=====================================
@@ -131,6 +131,11 @@ handle_tick(int unused STG_UNUSED)
flushEventLog(NULL);
}
}
+ // #27113 retry; gated on timer_disabled like the context-switch path
+ // above so that setNumCapabilities's resize pause is respected (#17289).
+ if (SEQ_CST_LOAD_ALWAYS(&timer_disabled) == 0) {
+ retryInterruptibleSignals();
+ }
#endif
/*
=====================================
testsuite/tests/concurrent/should_run/T27113.hs
=====================================
@@ -10,7 +10,7 @@ import System.IO
import System.Timeout
foreign import ccall interruptible "cpu_then_pause"
- c_cpu_then_pause :: CLong -> IO ()
+ c_cpu_then_pause :: CLLong -> IO ()
killDelay :: Int
killDelay = 200_000
@@ -18,7 +18,7 @@ killDelay = 200_000
joinTimeout :: Int
joinTimeout = 3_000_000
-runCase :: String -> CLong -> IO ()
+runCase :: String -> CLLong -> IO ()
runCase name spinNs = do
tid <- forkIO $
c_cpu_then_pause spinNs
=====================================
testsuite/tests/concurrent/should_run/T27113_c.c
=====================================
@@ -1,17 +1,18 @@
#include
participants (1)
-
Zubin (@wz1000)