[Git][ghc/ghc][wip/27113-retry] rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Zubin pushed to branch wip/27113-retry at Glasgow Haskell Compiler / GHC
Commits:
daa65163 by Zubin Duggal at 2026-05-19T18:46:15+05:30
rts: Retry SIGPIPE delivery for interruptible FFI calls (#27113)
Closes a race in the threaded RTS where a SIGPIPE fired by throwTo to
cancel an interruptible foreign call may land in the user-space prologue
of the FFI wrapper, between suspendThread releasing the capability and
the kernel syscall starting. GHC's empty SIGPIPE handler swallows the
signal and the syscall then blocks forever.
- - - - -
11 changed files:
- rts/RaiseAsync.c
- rts/Schedule.c
- rts/Schedule.h
- rts/Task.c
- rts/Task.h
- rts/Timer.c
- testsuite/tests/concurrent/should_run/T27113.hs
- testsuite/tests/concurrent/should_run/T27113_c.c
- + testsuite/tests/concurrent/should_run/T27113b.hs
- + testsuite/tests/concurrent/should_run/T27113c.hs
- testsuite/tests/concurrent/should_run/all.T
Changes:
=====================================
rts/RaiseAsync.c
=====================================
@@ -36,6 +36,11 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS,
Capability *target_cap USED_IF_THREADS,
MessageThrowTo *msg USED_IF_THREADS);
+#if defined(THREADED_RTS)
+static void clear_interrupt_pending_if_no_live_throw (Capability *cap,
+ StgTSO *target);
+#endif
+
/* -----------------------------------------------------------------------------
throwToSingleThreaded
@@ -338,7 +343,13 @@ check_target:
}
// nobody else can wake up this TSO after we claim the message
+#if defined(THREADED_RTS)
+ StgTSO *m_target = m->target;
+#endif
doneWithMsgThrowTo(cap, m);
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
raiseAsync(cap, target, msg->exception, false, NULL);
return THROWTO_SUCCESS;
@@ -433,11 +444,12 @@ check_target:
#if defined(THREADED_RTS)
{
Task *task = NULL;
- // walk suspended_ccalls to find the correct worker thread
+ InCall *target_incall = NULL;
InCall *incall;
for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
if (incall->suspended_tso == target) {
task = incall->task;
+ target_incall = incall;
break;
}
}
@@ -446,6 +458,7 @@ check_target:
if (!((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0))) {
interruptWorkerTask(task);
+ RELEASE_STORE(&target_incall->interrupt_pending, 1);
}
return THROWTO_BLOCKED;
} else {
@@ -520,8 +533,14 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
ASSERT(target->cap == cap);
dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
+#if defined(THREADED_RTS)
+ ACQUIRE_LOCK(&cap->lock);
+#endif
msg->link = target->blocked_exceptions;
- target->blocked_exceptions = msg;
+ RELEASE_STORE(&target->blocked_exceptions, msg);
+#if defined(THREADED_RTS)
+ RELEASE_LOCK(&cap->lock);
+#endif
}
/* -----------------------------------------------------------------------------
@@ -664,6 +683,46 @@ removeFromMVarBlockedQueue (StgTSO *tso)
tso->_link = END_TSO_QUEUE;
}
+#if defined(THREADED_RTS)
+static void
+clear_interrupt_pending_if_no_live_throw (Capability *cap, StgTSO *target)
+{
+ if (target == NULL) return;
+ if (ACQUIRE_LOAD(&target->why_blocked) != BlockedOnCCall_Interruptible)
+ return;
+
+ Capability *target_cap = ACQUIRE_LOAD(&target->cap);
+ ACQUIRE_LOCK(&target_cap->lock);
+
+ // Find the InCall on target_cap. If target has already resumed (or
+ // wasn't really suspended on this cap), do nothing.
+ InCall *found = NULL;
+ for (InCall *ic = target_cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ic->suspended_tso == target) { found = ic; break; }
+ }
+ if (found == NULL) {
+ RELEASE_LOCK(&target_cap->lock);
+ return;
+ }
+
+ bool live = false;
+ for (MessageThrowTo *t = ACQUIRE_LOAD(&target->blocked_exceptions);
+ t != END_BLOCKED_EXCEPTIONS_QUEUE;
+ t = (MessageThrowTo*)t->link) {
+ if (RELAXED_LOAD((StgWord*)&t->header.info)
+ != (StgWord)&stg_MSG_NULL_info) {
+ live = true; break;
+ }
+ }
+ if (!live) {
+ RELEASE_STORE(&found->interrupt_pending, 0);
+ }
+
+ RELEASE_LOCK(&target_cap->lock);
+ (void)cap;
+}
+#endif
+
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
@@ -699,8 +758,16 @@ removeFromQueues(Capability *cap, StgTSO *tso)
// capabilities.
// ASSERT(m->header.info == &stg_WHITEHOLE_info);
+#if defined(THREADED_RTS)
+ StgTSO *m_target = m->target;
+#endif
+
// unlock and revoke it at the same time
doneWithMsgThrowTo(cap, m);
+
+#if defined(THREADED_RTS)
+ clear_interrupt_pending_if_no_live_throw(cap, m_target);
+#endif
break;
}
=====================================
rts/Schedule.c
=====================================
@@ -2536,6 +2536,25 @@ suspendThread (StgRegTable *reg, bool interruptible)
suspendTask(cap,task);
cap->in_haskell = false;
+
+#if defined(THREADED_RTS)
+ if (interruptible
+ && !((tso->flags & TSO_BLOCKEX) &&
+ ((tso->flags & TSO_INTERRUPTIBLE) == 0))) {
+ MessageThrowTo *head = ACQUIRE_LOAD(&tso->blocked_exceptions);
+ bool live = false;
+ for (MessageThrowTo *m = head;
+ m != END_BLOCKED_EXCEPTIONS_QUEUE;
+ m = (MessageThrowTo*)m->link) {
+ if (RELAXED_LOAD((StgWord*)&m->header.info)
+ != (StgWord)&stg_MSG_NULL_info) { live = true; break; }
+ }
+ if (live) {
+ RELEASE_STORE(&task->incall->interrupt_pending, 1);
+ }
+ }
+#endif
+
releaseCapability_(cap,false);
RELEASE_LOCK(&cap->lock);
@@ -2574,8 +2593,10 @@ resumeThread (void *task_)
// entry on the suspended_ccalls list will also have been
// migrated.
- // Remove the thread from the suspended list
+ ACQUIRE_LOCK(&cap->lock);
+ RELEASE_STORE(&incall->interrupt_pending, 0);
recoverSuspendedTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
tso = incall->suspended_tso;
incall->suspended_tso = NULL;
@@ -2614,6 +2635,31 @@ resumeThread (void *task_)
return &cap->r;
}
+#if defined(THREADED_RTS)
+void
+retryInterruptibleSignals(void)
+{
+ bool any_pending = false;
+ uint32_t n = getNumCapabilities();
+ for (uint32_t i = 0; i < n; i++) {
+ Capability *cap = getCapability(i);
+ if (RELAXED_LOAD(&cap->n_suspended_ccalls) == 0) continue;
+ if (TRY_ACQUIRE_LOCK(&cap->lock) != 0) continue;
+ for (InCall *ic = cap->suspended_ccalls; ic != NULL; ic = ic->next) {
+ if (ACQUIRE_LOAD(&ic->interrupt_pending) == 0) continue;
+ // ic->task is set by newInCall and never cleared.
+ if (ic->task != NULL) {
+ any_pending = true;
+ interruptOSThread(ic->task->id);
+ }
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+ // Keep the timer alive; handle_tick's idle path would otherwise stop it.
+ if (any_pending) setRecentActivity(ACTIVITY_YES);
+}
+#endif
+
/* ---------------------------------------------------------------------------
* scheduleThread()
*
=====================================
rts/Schedule.h
=====================================
@@ -43,6 +43,10 @@ void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
void wakeUpRts(void);
#endif
+#if defined(THREADED_RTS)
+void retryInterruptibleSignals(void);
+#endif
+
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
=====================================
rts/Task.c
=====================================
@@ -267,6 +267,7 @@ newInCall (Task *task)
incall->next = NULL;
incall->prev = NULL;
incall->prev_stack = task->incall;
+ incall->interrupt_pending = 0;
task->incall = incall;
}
=====================================
rts/Task.h
=====================================
@@ -106,6 +106,8 @@ typedef struct InCall_ {
// Links InCalls onto suspended_ccalls, spare_incalls
struct InCall_ *prev;
struct InCall_ *next;
+
+ StgWord interrupt_pending;
} InCall;
typedef struct Task_ {
=====================================
rts/Timer.c
=====================================
@@ -131,6 +131,9 @@ handle_tick(int unused STG_UNUSED)
flushEventLog(NULL);
}
}
+ if (SEQ_CST_LOAD_ALWAYS(&timer_disabled) == 0) {
+ retryInterruptibleSignals();
+ }
#endif
/*
=====================================
testsuite/tests/concurrent/should_run/T27113.hs
=====================================
@@ -10,7 +10,7 @@ import System.IO
import System.Timeout
foreign import ccall interruptible "cpu_then_pause"
- c_cpu_then_pause :: CLong -> IO ()
+ c_cpu_then_pause :: CLLong -> IO ()
killDelay :: Int
killDelay = 200_000
@@ -18,7 +18,7 @@ killDelay = 200_000
joinTimeout :: Int
joinTimeout = 3_000_000
-runCase :: String -> CLong -> IO ()
+runCase :: String -> CLLong -> IO ()
runCase name spinNs = do
tid <- forkIO $
c_cpu_then_pause spinNs
=====================================
testsuite/tests/concurrent/should_run/T27113_c.c
=====================================
@@ -1,17 +1,18 @@
#include
participants (1)
-
Zubin (@wz1000)