[Git][ghc/ghc][wip/27113-retry] rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Zubin pushed to branch wip/27113-retry at Glasgow Haskell Compiler / GHC
Commits:
a422bdb4 by Zubin Duggal at 2026-05-19T18:32:23+05:30
rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Closes a race in the threaded RTS where a SIGPIPE fired by throwTo to
cancel an interruptible foreign call may land in the user-space prologue
of the FFI wrapper, between suspendThread releasing the capability and
the kernel syscall starting. GHC's empty SIGPIPE handler swallows the
signal and the syscall then blocks forever.
- - - - -
11 changed files:
- rts/RaiseAsync.c
- rts/Schedule.c
- rts/Schedule.h
- rts/Task.c
- rts/Task.h
- rts/Timer.c
- testsuite/tests/concurrent/should_run/T27113.hs
- testsuite/tests/concurrent/should_run/T27113_c.c
- + testsuite/tests/concurrent/should_run/T27113b.hs
- + testsuite/tests/concurrent/should_run/T27113c.hs
- testsuite/tests/concurrent/should_run/all.T
Changes:
=====================================
rts/RaiseAsync.c
=====================================
@@ -36,6 +36,11 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS,
Capability *target_cap USED_IF_THREADS,
MessageThrowTo *msg USED_IF_THREADS);
+#if defined(THREADED_RTS)
+static void clear_interrupt_pending_if_no_live_throw (Capability *cap,
+ StgTSO *target);
+#endif
+
/* -----------------------------------------------------------------------------
throwToSingleThreaded
@@ -338,7 +343,14 @@ check_target:
}
// nobody else can wake up this TSO after we claim the message
+#if defined(THREADED_RTS)
+ // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
+ StgTSO *m_target = m->target;
+#endif
doneWithMsgThrowTo(cap, m);
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
raiseAsync(cap, target, msg->exception, false, NULL);
return THROWTO_SUCCESS;
@@ -433,11 +445,12 @@ check_target:
#if defined(THREADED_RTS)
{
Task *task = NULL;
- // walk suspended_ccalls to find the correct worker thread
+ InCall *target_incall = NULL;
InCall *incall;
for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
if (incall->suspended_tso == target) {
task = incall->task;
+ target_incall = incall;
break;
}
}
@@ -446,6 +459,8 @@ check_target:
if (!((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0))) {
interruptWorkerTask(task);
+ // #27113: ticker re-fires interruptOSThread while this is set.
+ RELEASE_STORE(&target_incall->interrupt_pending, 1);
}
return THROWTO_BLOCKED;
} else {
@@ -511,6 +526,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
// Block a throwTo message on the target TSO's blocked_exceptions
// queue. The current Capability must own the target TSO in order to
// modify the blocked_exceptions queue.
+//
+// #27113: cap->lock serialises the queue mutation with the cross-cap revoke
+// walk in clear_interrupt_pending_if_no_live_throw.
void
blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
@@ -520,8 +538,14 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
+#if defined(THREADED_RTS)
+ ACQUIRE_LOCK(&cap->lock);
+#endif
msg->link = target->blocked_exceptions;
- target->blocked_exceptions = msg;
+ RELEASE_STORE(&target->blocked_exceptions, msg);
+#if defined(THREADED_RTS)
+ RELEASE_LOCK(&cap->lock);
+#endif
}
/* -----------------------------------------------------------------------------
@@ -581,6 +605,8 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
throwToSingleThreaded(cap, msg->target, msg->exception);
source = msg->source;
+ // #27113: msg->target == tso; its own resumeThread already cleared
+ // (or is about to clear) interrupt_pending, so no helper call here.
doneWithMsgThrowTo(cap, msg);
tryWakeupThread(cap, source);
return 1;
@@ -603,6 +629,7 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
i = lockClosure((StgClosure *)msg);
if (i != &stg_MSG_NULL_info) {
source = msg->source;
+ // #27113: msg->target == tso; its own resumeThread clears the flag.
doneWithMsgThrowTo(cap, msg);
tryWakeupThread(cap, source);
} else {
@@ -664,6 +691,54 @@ removeFromMVarBlockedQueue (StgTSO *tso)
tso->_link = END_TSO_QUEUE;
}
+#if defined(THREADED_RTS)
+// #27113: clear an interruptible-FFI target's interrupt_pending flag once
+// a revoke leaves no live (non-MSG_NULL) entry on its blocked_exceptions
+// queue. target->cap is stable while target is suspended in an interruptible
+// FFI call (no migration off the run queue), so the InCall lives on
+// target_cap->suspended_ccalls. target_cap->lock is taken unconditionally:
+// it serialises us with blockedThrowTo, suspendThread's re-arm, and
+// resumeThread's unlink — and also with the ticker's TRY_ACQUIRE_LOCK, so
+// the clear is not raced by a ticker reading the pre-clear flag.
+static void
+clear_interrupt_pending_if_no_live_throw (Capability *cap, StgTSO *target)
+{
+ if (target == NULL) return;
+ if (ACQUIRE_LOAD(&target->why_blocked) != BlockedOnCCall_Interruptible)
+ return;
+
+ Capability *target_cap = ACQUIRE_LOAD(&target->cap);
+ ACQUIRE_LOCK(&target_cap->lock);
+
+ // Find the InCall on target_cap. If target has already resumed (or
+ // wasn't really suspended on this cap), do nothing.
+ InCall *found = NULL;
+ for (InCall *ic = target_cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ic->suspended_tso == target) { found = ic; break; }
+ }
+ if (found == NULL) {
+ RELEASE_LOCK(&target_cap->lock);
+ return;
+ }
+
+ bool live = false;
+ for (MessageThrowTo *t = ACQUIRE_LOAD(&target->blocked_exceptions);
+ t != END_BLOCKED_EXCEPTIONS_QUEUE;
+ t = (MessageThrowTo*)t->link) {
+ if (RELAXED_LOAD((StgWord*)&t->header.info)
+ != (StgWord)&stg_MSG_NULL_info) {
+ live = true; break;
+ }
+ }
+ if (!live) {
+ RELEASE_STORE(&found->interrupt_pending, 0);
+ }
+
+ RELEASE_LOCK(&target_cap->lock);
+ (void)cap;
+}
+#endif
+
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
@@ -699,8 +774,17 @@ removeFromQueues(Capability *cap, StgTSO *tso)
// capabilities.
// ASSERT(m->header.info == &stg_WHITEHOLE_info);
+#if defined(THREADED_RTS)
+ // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
+ StgTSO *m_target = m->target;
+#endif
+
// unlock and revoke it at the same time
doneWithMsgThrowTo(cap, m);
+
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
break;
}
=====================================
rts/Schedule.c
=====================================
@@ -2536,6 +2536,30 @@ suspendThread (StgRegTable *reg, bool interruptible)
suspendTask(cap,task);
cap->in_haskell = false;
+
+#if defined(THREADED_RTS)
+ // #27113: re-arm interrupt_pending if entering an interruptible call with
+ // a live throw already queued. Covers under-mask EINTR-retry: throwToMsg
+ // set the flag, resumeThread cleared it, and we're now re-entering pause.
+ // Under cap->lock, after suspendTask, so the scan and store are serialised
+ // with the revoke walk.
+ if (interruptible
+ && !((tso->flags & TSO_BLOCKEX) &&
+ ((tso->flags & TSO_INTERRUPTIBLE) == 0))) {
+ MessageThrowTo *head = ACQUIRE_LOAD(&tso->blocked_exceptions);
+ bool live = false;
+ for (MessageThrowTo *m = head;
+ m != END_BLOCKED_EXCEPTIONS_QUEUE;
+ m = (MessageThrowTo*)m->link) {
+ if (RELAXED_LOAD((StgWord*)&m->header.info)
+ != (StgWord)&stg_MSG_NULL_info) { live = true; break; }
+ }
+ if (live) {
+ RELEASE_STORE(&task->incall->interrupt_pending, 1);
+ }
+ }
+#endif
+
releaseCapability_(cap,false);
RELEASE_LOCK(&cap->lock);
@@ -2574,8 +2598,14 @@ resumeThread (void *task_)
// entry on the suspended_ccalls list will also have been
// migrated.
- // Remove the thread from the suspended list
+ // #27113: clear interrupt_pending and unlink under cap->lock so the
+ // ticker's TRY_ACQUIRE_LOCK serialises with us. Without the lock, the
+ // ticker can read a stale flag and SIGPIPE the worker after it has
+ // already returned from the FFI call onto an unrelated syscall.
+ ACQUIRE_LOCK(&cap->lock);
+ RELEASE_STORE(&incall->interrupt_pending, 0);
recoverSuspendedTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
tso = incall->suspended_tso;
incall->suspended_tso = NULL;
@@ -2614,6 +2644,49 @@ resumeThread (void *task_)
return &cap->r;
}
+#if defined(THREADED_RTS)
+// #27113: re-fire interruptOSThread for InCalls whose interrupt_pending is
+// still set. Reads only InCall fields, never TSO heap state, so no GC race.
+//
+// cap->lock is held across the OS interrupt call. This pairs with the
+// same lock taken by resumeThread around its clear+unlink: when we fire
+// interruptOSThread, resumeThread is either blocked behind us on the lock
+// (signal lands harmlessly on a thread inside pthread sync) or has not yet
+// run (signal lands inside the FFI call where it belongs). Releasing the
+// lock before signalling would let resumeThread complete and re-enter
+// Haskell first, so SIGPIPE / CancelSynchronousIo would hit an unrelated
+// later syscall.
+//
+// The per-OS-call cost is small (~µs for pthread_kill on POSIX, ~tens of
+// µs for OpenThread+CancelSynchronousIo+CloseHandle on Windows), but the
+// per-tick lock-hold time scales linearly with the number of pending
+// interruptible calls on this cap. For typical workloads this is well
+// below the tick interval; FFI-heavy workloads with many concurrent
+// interruptible calls per cap may want empirical measurement.
+void
+retryInterruptibleSignals(void)
+{
+ bool any_pending = false;
+ uint32_t n = getNumCapabilities();
+ for (uint32_t i = 0; i < n; i++) {
+ Capability *cap = getCapability(i);
+ if (RELAXED_LOAD(&cap->n_suspended_ccalls) == 0) continue;
+ if (TRY_ACQUIRE_LOCK(&cap->lock) != 0) continue;
+ for (InCall *ic = cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ACQUIRE_LOAD(&ic->interrupt_pending) == 0) continue;
+ // ic->task is set by newInCall and never cleared.
+ if (ic->task != NULL) {
+ any_pending = true;
+ interruptOSThread(ic->task->id);
+ }
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+ // Keep the timer alive; handle_tick's idle path would otherwise stop it.
+ if (any_pending) setRecentActivity(ACTIVITY_YES);
+}
+#endif
+
/* ---------------------------------------------------------------------------
* scheduleThread()
*
=====================================
rts/Schedule.h
=====================================
@@ -43,6 +43,10 @@ void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
void wakeUpRts(void);
#endif
+#if defined(THREADED_RTS)
+void retryInterruptibleSignals(void);
+#endif
+
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
=====================================
rts/Task.c
=====================================
@@ -267,6 +267,7 @@ newInCall (Task *task)
incall->next = NULL;
incall->prev = NULL;
incall->prev_stack = task->incall;
+ incall->interrupt_pending = 0;
task->incall = incall;
}
=====================================
rts/Task.h
=====================================
@@ -106,6 +106,15 @@ typedef struct InCall_ {
// Links InCalls onto suspended_ccalls, spare_incalls
struct InCall_ *prev;
struct InCall_ *next;
+
+ // #27113. Set (atomically) by throwToMsg once it has fired
+ // interruptOSThread for this call's worker; cleared by resumeThread
+ // (when the call returns) and by the revoke paths in
+ // doneWithMsgThrowTo (when all queued throws against the target are
+ // MSG_NULL). The ticker reads this and re-fires interruptOSThread
+ // while it is set. Not GC-managed — lives in malloc'd memory with
+ // the Task that owns this InCall.
+ StgWord interrupt_pending;
} InCall;
typedef struct Task_ {
=====================================
rts/Timer.c
=====================================
@@ -131,6 +131,11 @@ handle_tick(int unused STG_UNUSED)
flushEventLog(NULL);
}
}
+ // #27113 retry; gated on timer_disabled like the context-switch path
+ // above so that setNumCapabilities's resize pause is respected (#17289).
+ if (SEQ_CST_LOAD_ALWAYS(&timer_disabled) == 0) {
+ retryInterruptibleSignals();
+ }
#endif
/*
=====================================
testsuite/tests/concurrent/should_run/T27113.hs
=====================================
@@ -10,7 +10,7 @@ import System.IO
import System.Timeout
foreign import ccall interruptible "cpu_then_pause"
- c_cpu_then_pause :: CLong -> IO ()
+ c_cpu_then_pause :: CLLong -> IO ()
killDelay :: Int
killDelay = 200_000
@@ -18,7 +18,7 @@ killDelay = 200_000
joinTimeout :: Int
joinTimeout = 3_000_000
-runCase :: String -> CLong -> IO ()
+runCase :: String -> CLLong -> IO ()
runCase name spinNs = do
tid <- forkIO $
c_cpu_then_pause spinNs
=====================================
testsuite/tests/concurrent/should_run/T27113_c.c
=====================================
@@ -1,17 +1,18 @@
#include
participants (1)
-
Zubin (@wz1000)