Duncan Coutts pushed to branch wip/dcoutts/issue-26717 at Glasgow Haskell Compiler / GHC Commits: 08518499 by Duncan Coutts at 2026-04-01T09:19:44+01:00 Follow atomic access rules more consistently for tso->why_blocked The rule is this: store block_info *before* why_blocked store why_blocked using store release load why_blocked using load acquire load block_info *after* why_blocked This is a an atomic store release / load acquire pair and (if the reads are in a separate thread to the writes, and the read receives the value stored) then this guarantees a full "happens before" relationship of these stores and loads. In some cases, we do not need a full load acquire, because we don't read the block_info at all and so do not need any ordering. In this case we just need an atomic relaxed load. This was being followed in most places, but not all. If there's good reason in any case that we don't need atomic access, then we should document that in a comment. In the absence of that I think it's easier to follow the rule everywhere. - - - - - 10 changed files: - rts/IOManager.c - rts/RaiseAsync.c - rts/Schedule.c - rts/Threads.c - rts/TraverseHeap.c - rts/posix/Poll.c - rts/posix/Select.c - rts/posix/Timeout.c - rts/sm/Sanity.c - rts/win32/AsyncMIO.c Changes: ===================================== rts/IOManager.c ===================================== @@ -772,11 +772,12 @@ bool syncIOWaitReady(Capability *cap, #if defined(IOMGR_ENABLED_SELECT) case IO_MANAGER_SELECT: { - StgWord why_blocked = (rw == IORead ? BlockedOnRead : BlockedOnWrite) - | BlockInfoForceNonClosure; + unsigned int why_blocked = (rw == IORead ? BlockedOnRead + : BlockedOnWrite) + | BlockInfoForceNonClosure; tso->block_info.fd = fd; - RELEASE_STORE(&tso->why_blocked, why_blocked); appendToIOBlockedQueue(cap, tso); + RELEASE_STORE(&tso->why_blocked, why_blocked); return true; } #endif @@ -833,8 +834,8 @@ bool syncDelay(Capability *cap, StgTSO *tso, HsInt us_delay) { LowResTime target = getDelayTarget(us_delay); tso->block_info.target = target; - RELEASE_STORE(&tso->why_blocked, BlockedOnDelay | BlockInfoForceNonClosure); insertIntoSleepingQueue(cap, tso, target); + RELEASE_STORE(&tso->why_blocked, BlockedOnDelay | BlockInfoForceNonClosure); return true; } #endif @@ -854,8 +855,8 @@ bool syncDelay(Capability *cap, StgTSO *tso, HsInt us_delay) * simplifies matters, so set the status to OnDoProc and put the * delayed thread on the blocked_queue. */ - RELEASE_STORE(&tso->why_blocked, BlockedOnDoProc); appendToIOBlockedQueue(cap, tso); + RELEASE_STORE(&tso->why_blocked, BlockedOnDoProc); return true; } #endif ===================================== rts/RaiseAsync.c ===================================== @@ -233,7 +233,6 @@ throwTo (Capability *cap, // the Capability we hold uint32_t throwToMsg (Capability *cap, MessageThrowTo *msg) { - StgWord status; StgTSO *target = ACQUIRE_LOAD(&msg->target); Capability *target_cap; @@ -268,9 +267,9 @@ check_target: return THROWTO_BLOCKED; } - status = ACQUIRE_LOAD(&target->why_blocked); + unsigned int why_blocked = ACQUIRE_LOAD(&target->why_blocked); - switch (UntagWhyBlocked(status)) { + switch (UntagWhyBlocked(why_blocked)) { case NotBlocked: { if ((target->flags & TSO_BLOCKEX) == 0) { @@ -370,8 +369,9 @@ check_target: // we have the MVar, let's check whether the thread // is still blocked on the same MVar. - if ((target->why_blocked != BlockedOnMVar - && target->why_blocked != BlockedOnMVarRead) + unsigned int why_blocked_still = ACQUIRE_LOAD(&target->why_blocked); + if (( why_blocked_still != BlockedOnMVar + && why_blocked_still != BlockedOnMVarRead) || target->block_info.mvar != mvar) { unlockClosure((StgClosure *)mvar, info); goto retry; @@ -490,7 +490,7 @@ check_target: goto retry; default: - barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked); + barf("throwTo: unrecognised why_blocked (%d)", why_blocked); } barf("throwTo"); } @@ -667,7 +667,7 @@ removeFromMVarBlockedQueue (StgTSO *tso) static void removeFromQueues(Capability *cap, StgTSO *tso) { - switch (UntagWhyBlocked(tso->why_blocked)) { + switch (UntagWhyBlocked(ACQUIRE_LOAD(&tso->why_blocked))) { case NotBlocked: case ThreadMigrating: @@ -721,8 +721,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - RELAXED_STORE(&tso->why_blocked, NotBlocked); appendToRunQueue(cap, tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); } /* ----------------------------------------------------------------------------- @@ -1105,9 +1105,9 @@ done: IF_DEBUG(sanity, checkTSO(tso)); // wake it up - if (tso->why_blocked != NotBlocked) { - tso->why_blocked = NotBlocked; + if (RELAXED_LOAD(&tso->why_blocked) != NotBlocked) { appendToRunQueue(cap,tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); } return tso; ===================================== rts/Schedule.c ===================================== @@ -524,7 +524,7 @@ run_thread: #endif if (ret == ThreadBlocked) { - StgWord why_blocked = ACQUIRE_LOAD(&t->why_blocked); + unsigned int why_blocked = ACQUIRE_LOAD(&t->why_blocked); if (why_blocked == BlockedOnBlackHole) { StgTSO *owner = blackHoleOwner(t->block_info.bh->bh); traceEventStopThread(cap, t, eventlogStopStatus(why_blocked), @@ -1098,7 +1098,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // // and a is never equal to b given a consistent view of memory. // - if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (t -> trec != NO_TREC && RELAXED_LOAD(&t->why_blocked) == NotBlocked) { if (!stmValidateNestOfTransactions(cap, t -> trec, true)) { debugTrace(DEBUG_sched | DEBUG_stm, "trec %p found wasting its time", t); @@ -2523,9 +2523,9 @@ suspendThread (StgRegTable *reg, bool interruptible) tso->block_info.unused = END_TSO_QUEUE; if (interruptible) { - tso->why_blocked = BlockedOnCCall_Interruptible; + RELEASE_STORE(&tso->why_blocked, BlockedOnCCall_Interruptible); } else { - tso->why_blocked = BlockedOnCCall; + RELEASE_STORE(&tso->why_blocked, BlockedOnCCall); } // Hand back capability @@ -2583,16 +2583,25 @@ resumeThread (void *task_) tso = incall->suspended_tso; incall->suspended_tso = NULL; incall->suspended_cap = NULL; + + // we set why_blocked previously in suspendThread + ASSERT(tso->why_blocked == BlockedOnCCall || + tso->why_blocked == BlockedOnCCall_Interruptible); + // we will modify tso->_link IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure *)tso->_link); } tso->_link = END_TSO_QUEUE; + // but no need to modify tso->block_info.prev as coincidentally + // it has the value we want already (since in suspendThread we set + // tso->block_info.unused to END_TSO_QUEUE for BlockedOnCCall). + ASSERT(tso->block_info.prev == END_TSO_QUEUE); traceEventRunThread(cap, tso); /* Reset blocking status */ - tso->why_blocked = NotBlocked; + RELEASE_STORE(&tso->why_blocked, NotBlocked); if ((tso->flags & TSO_BLOCKEX) == 0) { // avoid locking the TSO if we don't have to @@ -2944,8 +2953,9 @@ deleteThread (StgTSO *tso) // The TSO must be on the run queue of the Capability we own, or // we must own all Capabilities. - if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_Interruptible) { + unsigned int why_blocked = RELAXED_LOAD(&tso->why_blocked); + if (why_blocked != BlockedOnCCall && + why_blocked != BlockedOnCCall_Interruptible) { throwToSingleThreaded(tso->cap,tso,NULL); } } @@ -2956,10 +2966,12 @@ deleteThread_(StgTSO *tso) { // for forkProcess only: // like deleteThread(), but we delete threads in foreign calls, too. - if (tso->why_blocked == BlockedOnCCall || - tso->why_blocked == BlockedOnCCall_Interruptible) { + unsigned int why_blocked = RELAXED_LOAD(&tso->why_blocked); + if (why_blocked == BlockedOnCCall || + why_blocked == BlockedOnCCall_Interruptible) { tso->what_next = ThreadKilled; appendToRunQueue(tso->cap, tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); } else { deleteThread(tso); } @@ -3310,7 +3322,7 @@ resurrectThreads (StgTSO *threads) // Wake up the thread on the Capability it was last on cap = tso->cap; - switch (tso->why_blocked) { + switch (RELAXED_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: /* Called by GC - sched_mutex lock is currently held. */ ===================================== rts/Threads.c ===================================== @@ -335,8 +335,8 @@ tryWakeupThread (Capability *cap, StgTSO *tso) unblock: // just run the thread now, if the BH is not really available, // we'll block again. - tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); // We used to set the context switch flag here, which would // trigger a context switch a short time in the future (at the end @@ -368,7 +368,7 @@ migrateThread (Capability *from, StgTSO *tso, Capability *to) // ThreadMigrating tells the target cap that it needs to be added to // the run queue when it receives the MSG_TRY_WAKEUP. tso->block_info.unused = END_TSO_QUEUE; - tso->why_blocked = ThreadMigrating; + RELEASE_STORE(&tso->why_blocked, ThreadMigrating); tso->cap = to; tryWakeupThread(from, tso); } @@ -847,7 +847,7 @@ loop: // save why_blocked here, because waking up the thread destroys // this information - StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked); + unsigned int why_blocked = ACQUIRE_LOAD(&tso->why_blocked); ASSERT(why_blocked == BlockedOnMVarRead || why_blocked == BlockedOnMVar); ASSERT(tso->block_info.mvar == mvar); @@ -1017,7 +1017,7 @@ printAllThreads(void) debugBelch("other threads:\n"); for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { - if (t->why_blocked != NotBlocked) { + if (RELAXED_LOAD(&t->why_blocked) != NotBlocked) { printThreadStatus(t); } next = t->global_link; ===================================== rts/TraverseHeap.c ===================================== @@ -1243,7 +1243,7 @@ inner_loop: traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data); traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data); - StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked); + unsigned int why_blocked = ACQUIRE_LOAD(&tso->why_blocked); if (IsBlockInfoClosure(why_blocked) && why_blocked != NotBlocked) { // The NotBlocked case uses block_info.prev as a TSO back link. // Do not follow in that case or we'll get into a loop. ===================================== rts/posix/Poll.c ===================================== @@ -146,8 +146,9 @@ bool syncIOWaitReadyPoll(Capability *cap, StgTSO *tso, aiop->notify.tso = tso; aiop->notify_type = NotifyTSO; aiop->live = &stg_ASYNCIO_LIVE0_closure; - tso->why_blocked = rw == IORead ? BlockedOnRead : BlockedOnWrite; tso->block_info.aiop = aiop; + RELEASE_STORE(&tso->why_blocked, rw == IORead ? BlockedOnRead + : BlockedOnWrite); return asyncIOWaitReadyPoll(cap, aiop, rw, fd); } @@ -258,10 +259,9 @@ static void notifyIOCompletion(Capability *cap, StgAsyncIOOp *aiop) * cap because the tso was not on the run queue of any cap and * so is not subject to thread migration. */ - StgTSO *tso = aiop->notify.tso; - tso->why_blocked = NotBlocked; - tso->_link = END_TSO_QUEUE; + StgTSO *tso = aiop->notify.tso; pushOnRunQueue(cap, tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); } break; } ===================================== rts/posix/Select.c ===================================== @@ -105,11 +105,10 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now) break; } iomgr->sleeping_queue = tso->_link; - RELAXED_STORE(&tso->why_blocked, NotBlocked); - tso->_link = END_TSO_QUEUE; IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %" FMT_StgThreadID "\n", tso->id)); pushOnRunQueue(cap,tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); flag = true; } return flag; @@ -397,7 +396,7 @@ awaitCompletedTimeoutsOrIOSelect(Capability *cap, bool wait) int fd; enum FdState fd_state = RTS_FD_IS_BLOCKING; - switch (UntagWhyBlocked(tso->why_blocked)) { + switch (UntagWhyBlocked(ACQUIRE_LOAD(&tso->why_blocked))) { case BlockedOnRead: fd = tso->block_info.fd; @@ -436,9 +435,8 @@ awaitCompletedTimeoutsOrIOSelect(Capability *cap, bool wait) IF_DEBUG(scheduler, debugBelch("Waking up blocked thread %" FMT_StgThreadID "\n", tso->id)); - tso->why_blocked = NotBlocked; - tso->_link = END_TSO_QUEUE; pushOnRunQueue(cap,tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); break; case RTS_FD_IS_BLOCKING: if (prev == NULL) ===================================== rts/posix/Timeout.c ===================================== @@ -48,8 +48,8 @@ bool syncDelayTimeout(Capability *cap, StgTSO *tso, HsInt us_delay) initElemTimeoutQueue(timeout, notify, NotifyTSO, cap->r.rCCCS); ASSERT(tso->why_blocked == NotBlocked); - tso->why_blocked = BlockedOnDelay; tso->block_info.timeout = timeout; + RELEASE_STORE(&tso->why_blocked, BlockedOnDelay); insertTimeoutQueue(&cap->iomgr->timeout_queue, timeout, target); @@ -118,10 +118,10 @@ static void notifyTimeoutCompletion(Capability *cap, StgTimeout *timeout) switch (timeout->notify_type) { case NotifyTSO: { - StgTSO *tso = timeout->notify.tso; - tso->why_blocked = NotBlocked; - tso->_link = END_TSO_QUEUE; + StgTSO *tso = timeout->notify.tso; + tso->_link = END_TSO_QUEUE; pushOnRunQueue(cap, tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); break; } case NotifyMVar: ===================================== rts/sm/Sanity.c ===================================== @@ -779,7 +779,7 @@ checkTSO(StgTSO *tso) info == &stg_WHITEHOLE_info); // used to happen due to STM doing // lockTSO(), might not happen now - unsigned why_blocked = ACQUIRE_LOAD(&tso->why_blocked); + unsigned int why_blocked = ACQUIRE_LOAD(&tso->why_blocked); switch (why_blocked) { case NotBlocked: case BlockedOnMVar: ===================================== rts/win32/AsyncMIO.c ===================================== @@ -318,8 +318,6 @@ start: } // Terminates the run queue + this inner for-loop. - tso->_link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; // For stg_block_async frames (read/write/doProc), // write len and errCode directly to the stack. // For stg_block_noregs frames (delay), nothing @@ -329,14 +327,14 @@ start: tso->stackobj->sp[2] = (W_)errCode; } pushOnRunQueue(&MainCapability, tso); + RELEASE_STORE(&tso->why_blocked, NotBlocked); break; } break; - default: - if (tso->why_blocked != NotBlocked) { - barf("awaitRequests: odd thread state"); - } + case NotBlocked: break; + default: + barf("awaitRequests: odd thread state"); } prev = tso; View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/085184999d60ad1a98f3f08124041a8d... -- View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/085184999d60ad1a98f3f08124041a8d... You're receiving this email because of your account on gitlab.haskell.org.
participants (1)
-
Duncan Coutts (@dcoutts)