Zubin pushed to branch wip/27113-retry at Glasgow Haskell Compiler / GHC
Commits:
-
dc4d1c48
by Zubin Duggal at 2026-05-19T18:21:00+05:30
11 changed files:
- rts/RaiseAsync.c
- rts/Schedule.c
- rts/Schedule.h
- rts/Task.c
- rts/Task.h
- rts/Timer.c
- testsuite/tests/concurrent/should_run/T27113.hs
- testsuite/tests/concurrent/should_run/T27113_c.c
- + testsuite/tests/concurrent/should_run/T27113b.hs
- + testsuite/tests/concurrent/should_run/T27113c.hs
- testsuite/tests/concurrent/should_run/all.T
Changes:
| ... | ... | @@ -36,6 +36,11 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, |
| 36 | 36 | Capability *target_cap USED_IF_THREADS,
|
| 37 | 37 | MessageThrowTo *msg USED_IF_THREADS);
|
| 38 | 38 | |
| 39 | +#if defined(THREADED_RTS)
|
|
| 40 | +static void clear_interrupt_pending_if_no_live_throw (Capability *cap,
|
|
| 41 | + StgTSO *target);
|
|
| 42 | +#endif
|
|
| 43 | + |
|
| 39 | 44 | /* -----------------------------------------------------------------------------
|
| 40 | 45 | throwToSingleThreaded
|
| 41 | 46 | |
| ... | ... | @@ -338,7 +343,14 @@ check_target: |
| 338 | 343 | }
|
| 339 | 344 | |
| 340 | 345 | // nobody else can wake up this TSO after we claim the message
|
| 346 | +#if defined(THREADED_RTS)
|
|
| 347 | + // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
|
|
| 348 | + StgTSO *m_target = m->target;
|
|
| 349 | +#endif
|
|
| 341 | 350 | doneWithMsgThrowTo(cap, m);
|
| 351 | +#if defined(THREADED_RTS)
|
|
| 352 | + clear_interrupt_pending_if_no_live_throw(cap, m_target);
|
|
| 353 | +#endif
|
|
| 342 | 354 | |
| 343 | 355 | raiseAsync(cap, target, msg->exception, false, NULL);
|
| 344 | 356 | return THROWTO_SUCCESS;
|
| ... | ... | @@ -433,11 +445,12 @@ check_target: |
| 433 | 445 | #if defined(THREADED_RTS)
|
| 434 | 446 | {
|
| 435 | 447 | Task *task = NULL;
|
| 436 | - // walk suspended_ccalls to find the correct worker thread
|
|
| 448 | + InCall *target_incall = NULL;
|
|
| 437 | 449 | InCall *incall;
|
| 438 | 450 | for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
|
| 439 | 451 | if (incall->suspended_tso == target) {
|
| 440 | 452 | task = incall->task;
|
| 453 | + target_incall = incall;
|
|
| 441 | 454 | break;
|
| 442 | 455 | }
|
| 443 | 456 | }
|
| ... | ... | @@ -446,6 +459,8 @@ check_target: |
| 446 | 459 | if (!((target->flags & TSO_BLOCKEX) &&
|
| 447 | 460 | ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
|
| 448 | 461 | interruptWorkerTask(task);
|
| 462 | + // #27113: ticker re-fires interruptOSThread while this is set.
|
|
| 463 | + RELEASE_STORE(&target_incall->interrupt_pending, 1);
|
|
| 449 | 464 | }
|
| 450 | 465 | return THROWTO_BLOCKED;
|
| 451 | 466 | } else {
|
| ... | ... | @@ -511,6 +526,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, |
| 511 | 526 | // Block a throwTo message on the target TSO's blocked_exceptions
|
| 512 | 527 | // queue. The current Capability must own the target TSO in order to
|
| 513 | 528 | // modify the blocked_exceptions queue.
|
| 529 | +//
|
|
| 530 | +// #27113: cap->lock serialises the queue mutation with the cross-cap revoke
|
|
| 531 | +// walk in clear_interrupt_pending_if_no_live_throw.
|
|
| 514 | 532 | void
|
| 515 | 533 | blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
|
| 516 | 534 | {
|
| ... | ... | @@ -520,8 +538,14 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) |
| 520 | 538 | ASSERT(target->cap == cap);
|
| 521 | 539 | |
| 522 | 540 | dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
|
| 541 | +#if defined(THREADED_RTS)
|
|
| 542 | + ACQUIRE_LOCK(&cap->lock);
|
|
| 543 | +#endif
|
|
| 523 | 544 | msg->link = target->blocked_exceptions;
|
| 524 | - target->blocked_exceptions = msg;
|
|
| 545 | + RELEASE_STORE(&target->blocked_exceptions, msg);
|
|
| 546 | +#if defined(THREADED_RTS)
|
|
| 547 | + RELEASE_LOCK(&cap->lock);
|
|
| 548 | +#endif
|
|
| 525 | 549 | }
|
| 526 | 550 | |
| 527 | 551 | /* -----------------------------------------------------------------------------
|
| ... | ... | @@ -581,6 +605,8 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) |
| 581 | 605 | |
| 582 | 606 | throwToSingleThreaded(cap, msg->target, msg->exception);
|
| 583 | 607 | source = msg->source;
|
| 608 | + // #27113: msg->target == tso; its own resumeThread already cleared
|
|
| 609 | + // (or is about to clear) interrupt_pending, so no helper call here.
|
|
| 584 | 610 | doneWithMsgThrowTo(cap, msg);
|
| 585 | 611 | tryWakeupThread(cap, source);
|
| 586 | 612 | return 1;
|
| ... | ... | @@ -603,6 +629,7 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) |
| 603 | 629 | i = lockClosure((StgClosure *)msg);
|
| 604 | 630 | if (i != &stg_MSG_NULL_info) {
|
| 605 | 631 | source = msg->source;
|
| 632 | + // #27113: msg->target == tso; its own resumeThread clears the flag.
|
|
| 606 | 633 | doneWithMsgThrowTo(cap, msg);
|
| 607 | 634 | tryWakeupThread(cap, source);
|
| 608 | 635 | } else {
|
| ... | ... | @@ -664,6 +691,54 @@ removeFromMVarBlockedQueue (StgTSO *tso) |
| 664 | 691 | tso->_link = END_TSO_QUEUE;
|
| 665 | 692 | }
|
| 666 | 693 | |
| 694 | +#if defined(THREADED_RTS)
|
|
| 695 | +// #27113: clear an interruptible-FFI target's interrupt_pending flag once
|
|
| 696 | +// a revoke leaves no live (non-MSG_NULL) entry on its blocked_exceptions
|
|
| 697 | +// queue. target->cap is stable while target is suspended in an interruptible
|
|
| 698 | +// FFI call (no migration off the run queue), so the InCall lives on
|
|
| 699 | +// target_cap->suspended_ccalls. target_cap->lock is taken unconditionally:
|
|
| 700 | +// it serialises us with blockedThrowTo, suspendThread's re-arm, and
|
|
| 701 | +// resumeThread's unlink โ and also with the ticker's TRY_ACQUIRE_LOCK, so
|
|
| 702 | +// the clear is not raced by a ticker reading the pre-clear flag.
|
|
| 703 | +static void
|
|
| 704 | +clear_interrupt_pending_if_no_live_throw (Capability *cap, StgTSO *target)
|
|
| 705 | +{
|
|
| 706 | + if (target == NULL) return;
|
|
| 707 | + if (ACQUIRE_LOAD(&target->why_blocked) != BlockedOnCCall_Interruptible)
|
|
| 708 | + return;
|
|
| 709 | + |
|
| 710 | + Capability *target_cap = ACQUIRE_LOAD(&target->cap);
|
|
| 711 | + ACQUIRE_LOCK(&target_cap->lock);
|
|
| 712 | + |
|
| 713 | + // Find the InCall on target_cap. If target has already resumed (or
|
|
| 714 | + // wasn't really suspended on this cap), do nothing.
|
|
| 715 | + InCall *found = NULL;
|
|
| 716 | + for (InCall *ic = target_cap->suspended_ccalls; ic != NULL; ic = ic->next) {
|
|
| 717 | + if (ic->suspended_tso == target) { found = ic; break; }
|
|
| 718 | + }
|
|
| 719 | + if (found == NULL) {
|
|
| 720 | + RELEASE_LOCK(&target_cap->lock);
|
|
| 721 | + return;
|
|
| 722 | + }
|
|
| 723 | + |
|
| 724 | + bool live = false;
|
|
| 725 | + for (MessageThrowTo *t = ACQUIRE_LOAD(&target->blocked_exceptions);
|
|
| 726 | + t != END_BLOCKED_EXCEPTIONS_QUEUE;
|
|
| 727 | + t = (MessageThrowTo*)t->link) {
|
|
| 728 | + if (RELAXED_LOAD((StgWord*)&t->header.info)
|
|
| 729 | + != (StgWord)&stg_MSG_NULL_info) {
|
|
| 730 | + live = true; break;
|
|
| 731 | + }
|
|
| 732 | + }
|
|
| 733 | + if (!live) {
|
|
| 734 | + RELEASE_STORE(&found->interrupt_pending, 0);
|
|
| 735 | + }
|
|
| 736 | + |
|
| 737 | + RELEASE_LOCK(&target_cap->lock);
|
|
| 738 | + (void)cap;
|
|
| 739 | +}
|
|
| 740 | +#endif
|
|
| 741 | + |
|
| 667 | 742 | static void
|
| 668 | 743 | removeFromQueues(Capability *cap, StgTSO *tso)
|
| 669 | 744 | {
|
| ... | ... | @@ -699,8 +774,17 @@ removeFromQueues(Capability *cap, StgTSO *tso) |
| 699 | 774 | // capabilities.
|
| 700 | 775 | // ASSERT(m->header.info == &stg_WHITEHOLE_info);
|
| 701 | 776 | |
| 777 | +#if defined(THREADED_RTS)
|
|
| 778 | + // #27113: snapshot m->target before doneWithMsgThrowTo nulls m.
|
|
| 779 | + StgTSO *m_target = m->target;
|
|
| 780 | +#endif
|
|
| 781 | + |
|
| 702 | 782 | // unlock and revoke it at the same time
|
| 703 | 783 | doneWithMsgThrowTo(cap, m);
|
| 784 | + |
|
| 785 | +#if defined(THREADED_RTS)
|
|
| 786 | + clear_interrupt_pending_if_no_live_throw(cap, m_target);
|
|
| 787 | +#endif
|
|
| 704 | 788 | break;
|
| 705 | 789 | }
|
| 706 | 790 |
| ... | ... | @@ -2536,6 +2536,30 @@ suspendThread (StgRegTable *reg, bool interruptible) |
| 2536 | 2536 | |
| 2537 | 2537 | suspendTask(cap,task);
|
| 2538 | 2538 | cap->in_haskell = false;
|
| 2539 | + |
|
| 2540 | +#if defined(THREADED_RTS)
|
|
| 2541 | + // #27113: re-arm interrupt_pending if entering an interruptible call with
|
|
| 2542 | + // a live throw already queued. Covers under-mask EINTR-retry: throwToMsg
|
|
| 2543 | + // set the flag, resumeThread cleared it, and we're now re-entering pause.
|
|
| 2544 | + // Under cap->lock, after suspendTask, so the scan and store are serialised
|
|
| 2545 | + // with the revoke walk.
|
|
| 2546 | + if (interruptible
|
|
| 2547 | + && !((tso->flags & TSO_BLOCKEX) &&
|
|
| 2548 | + ((tso->flags & TSO_INTERRUPTIBLE) == 0))) {
|
|
| 2549 | + MessageThrowTo *head = ACQUIRE_LOAD(&tso->blocked_exceptions);
|
|
| 2550 | + bool live = false;
|
|
| 2551 | + for (MessageThrowTo *m = head;
|
|
| 2552 | + m != END_BLOCKED_EXCEPTIONS_QUEUE;
|
|
| 2553 | + m = (MessageThrowTo*)m->link) {
|
|
| 2554 | + if (RELAXED_LOAD((StgWord*)&m->header.info)
|
|
| 2555 | + != (StgWord)&stg_MSG_NULL_info) { live = true; break; }
|
|
| 2556 | + }
|
|
| 2557 | + if (live) {
|
|
| 2558 | + RELEASE_STORE(&task->incall->interrupt_pending, 1);
|
|
| 2559 | + }
|
|
| 2560 | + }
|
|
| 2561 | +#endif
|
|
| 2562 | + |
|
| 2539 | 2563 | releaseCapability_(cap,false);
|
| 2540 | 2564 | |
| 2541 | 2565 | RELEASE_LOCK(&cap->lock);
|
| ... | ... | @@ -2574,8 +2598,14 @@ resumeThread (void *task_) |
| 2574 | 2598 | // entry on the suspended_ccalls list will also have been
|
| 2575 | 2599 | // migrated.
|
| 2576 | 2600 | |
| 2577 | - // Remove the thread from the suspended list
|
|
| 2601 | + // #27113: clear interrupt_pending and unlink under cap->lock so the
|
|
| 2602 | + // ticker's TRY_ACQUIRE_LOCK serialises with us. Without the lock, the
|
|
| 2603 | + // ticker can read a stale flag and SIGPIPE the worker after it has
|
|
| 2604 | + // already returned from the FFI call onto an unrelated syscall.
|
|
| 2605 | + ACQUIRE_LOCK(&cap->lock);
|
|
| 2606 | + RELEASE_STORE(&incall->interrupt_pending, 0);
|
|
| 2578 | 2607 | recoverSuspendedTask(cap,task);
|
| 2608 | + RELEASE_LOCK(&cap->lock);
|
|
| 2579 | 2609 | |
| 2580 | 2610 | tso = incall->suspended_tso;
|
| 2581 | 2611 | incall->suspended_tso = NULL;
|
| ... | ... | @@ -2614,6 +2644,49 @@ resumeThread (void *task_) |
| 2614 | 2644 | return &cap->r;
|
| 2615 | 2645 | }
|
| 2616 | 2646 | |
| 2647 | +#if defined(THREADED_RTS)
|
|
| 2648 | +// #27113: re-fire interruptOSThread for InCalls whose interrupt_pending is
|
|
| 2649 | +// still set. Reads only InCall fields, never TSO heap state, so no GC race.
|
|
| 2650 | +//
|
|
| 2651 | +// cap->lock is held across the OS interrupt call. This pairs with the
|
|
| 2652 | +// same lock taken by resumeThread around its clear+unlink: when we fire
|
|
| 2653 | +// interruptOSThread, resumeThread is either blocked behind us on the lock
|
|
| 2654 | +// (signal lands harmlessly on a thread inside pthread sync) or has not yet
|
|
| 2655 | +// run (signal lands inside the FFI call where it belongs). Releasing the
|
|
| 2656 | +// lock before signalling would let resumeThread complete and re-enter
|
|
| 2657 | +// Haskell first, so SIGPIPE / CancelSynchronousIo would hit an unrelated
|
|
| 2658 | +// later syscall.
|
|
| 2659 | +//
|
|
| 2660 | +// The per-OS-call cost is small (~ยตs for pthread_kill on POSIX, ~tens of
|
|
| 2661 | +// ยตs for OpenThread+CancelSynchronousIo+CloseHandle on Windows), but the
|
|
| 2662 | +// per-tick lock-hold time scales linearly with the number of pending
|
|
| 2663 | +// interruptible calls on this cap. For typical workloads this is well
|
|
| 2664 | +// below the tick interval; FFI-heavy workloads with many concurrent
|
|
| 2665 | +// interruptible calls per cap may want empirical measurement.
|
|
| 2666 | +void
|
|
| 2667 | +retryInterruptibleSignals(void)
|
|
| 2668 | +{
|
|
| 2669 | + bool any_pending = false;
|
|
| 2670 | + uint32_t n = getNumCapabilities();
|
|
| 2671 | + for (uint32_t i = 0; i < n; i++) {
|
|
| 2672 | + Capability *cap = getCapability(i);
|
|
| 2673 | + if (RELAXED_LOAD(&cap->n_suspended_ccalls) == 0) continue;
|
|
| 2674 | + if (TRY_ACQUIRE_LOCK(&cap->lock) != 0) continue;
|
|
| 2675 | + for (InCall *ic = cap->suspended_ccalls; ic != NULL; ic = ic->next) {
|
|
| 2676 | + if (ACQUIRE_LOAD(&ic->interrupt_pending) == 0) continue;
|
|
| 2677 | + // ic->task is set by newInCall and never cleared.
|
|
| 2678 | + if (ic->task != NULL) {
|
|
| 2679 | + any_pending = true;
|
|
| 2680 | + interruptOSThread(ic->task->id);
|
|
| 2681 | + }
|
|
| 2682 | + }
|
|
| 2683 | + RELEASE_LOCK(&cap->lock);
|
|
| 2684 | + }
|
|
| 2685 | + // Keep the timer alive; handle_tick's idle path would otherwise stop it.
|
|
| 2686 | + if (any_pending) setRecentActivity(ACTIVITY_YES);
|
|
| 2687 | +}
|
|
| 2688 | +#endif
|
|
| 2689 | + |
|
| 2617 | 2690 | /* ---------------------------------------------------------------------------
|
| 2618 | 2691 | * scheduleThread()
|
| 2619 | 2692 | *
|
| ... | ... | @@ -43,6 +43,10 @@ void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso); |
| 43 | 43 | void wakeUpRts(void);
|
| 44 | 44 | #endif
|
| 45 | 45 | |
| 46 | +#if defined(THREADED_RTS)
|
|
| 47 | +void retryInterruptibleSignals(void);
|
|
| 48 | +#endif
|
|
| 49 | + |
|
| 46 | 50 | /* raiseExceptionHelper */
|
| 47 | 51 | StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
|
| 48 | 52 |
| ... | ... | @@ -267,6 +267,7 @@ newInCall (Task *task) |
| 267 | 267 | incall->next = NULL;
|
| 268 | 268 | incall->prev = NULL;
|
| 269 | 269 | incall->prev_stack = task->incall;
|
| 270 | + incall->interrupt_pending = 0;
|
|
| 270 | 271 | task->incall = incall;
|
| 271 | 272 | }
|
| 272 | 273 |
| ... | ... | @@ -106,6 +106,15 @@ typedef struct InCall_ { |
| 106 | 106 | // Links InCalls onto suspended_ccalls, spare_incalls
|
| 107 | 107 | struct InCall_ *prev;
|
| 108 | 108 | struct InCall_ *next;
|
| 109 | + |
|
| 110 | + // #27113. Set (atomically) by throwToMsg once it has fired
|
|
| 111 | + // interruptOSThread for this call's worker; cleared by resumeThread
|
|
| 112 | + // (when the call returns) and by the revoke paths in
|
|
| 113 | + // doneWithMsgThrowTo (when all queued throws against the target are
|
|
| 114 | + // MSG_NULL). The ticker reads this and re-fires interruptOSThread
|
|
| 115 | + // while it is set. Not GC-managed โ lives in malloc'd memory with
|
|
| 116 | + // the Task that owns this InCall.
|
|
| 117 | + StgWord interrupt_pending;
|
|
| 109 | 118 | } InCall;
|
| 110 | 119 | |
| 111 | 120 | typedef struct Task_ {
|
| ... | ... | @@ -131,6 +131,11 @@ handle_tick(int unused STG_UNUSED) |
| 131 | 131 | flushEventLog(NULL);
|
| 132 | 132 | }
|
| 133 | 133 | }
|
| 134 | + // #27113 retry; gated on timer_disabled like the context-switch path
|
|
| 135 | + // above so that setNumCapabilities's resize pause is respected (#17289).
|
|
| 136 | + if (SEQ_CST_LOAD_ALWAYS(&timer_disabled) == 0) {
|
|
| 137 | + retryInterruptibleSignals();
|
|
| 138 | + }
|
|
| 134 | 139 | #endif
|
| 135 | 140 | |
| 136 | 141 | /*
|
| ... | ... | @@ -10,7 +10,7 @@ import System.IO |
| 10 | 10 | import System.Timeout
|
| 11 | 11 | |
| 12 | 12 | foreign import ccall interruptible "cpu_then_pause"
|
| 13 | - c_cpu_then_pause :: CLong -> IO ()
|
|
| 13 | + c_cpu_then_pause :: CLLong -> IO ()
|
|
| 14 | 14 | |
| 15 | 15 | killDelay :: Int
|
| 16 | 16 | killDelay = 200_000
|
| ... | ... | @@ -18,7 +18,7 @@ killDelay = 200_000 |
| 18 | 18 | joinTimeout :: Int
|
| 19 | 19 | joinTimeout = 3_000_000
|
| 20 | 20 | |
| 21 | -runCase :: String -> CLong -> IO ()
|
|
| 21 | +runCase :: String -> CLLong -> IO ()
|
|
| 22 | 22 | runCase name spinNs = do
|
| 23 | 23 | tid <- forkIO $
|
| 24 | 24 | c_cpu_then_pause spinNs
|
| 1 | 1 | #include <errno.h>
|
| 2 | +#include <stdint.h>
|
|
| 2 | 3 | #include <time.h>
|
| 3 | 4 | #include <unistd.h>
|
| 4 | 5 | |
| 5 | -static long now_ns(void) {
|
|
| 6 | +static int64_t now_ns(void) {
|
|
| 6 | 7 | struct timespec t;
|
| 7 | 8 | while (clock_gettime(CLOCK_MONOTONIC, &t) != 0) {
|
| 8 | 9 | if (errno != EINTR) return 0;
|
| 9 | 10 | }
|
| 10 | - return (long)t.tv_sec * 1000000000L + (long)t.tv_nsec;
|
|
| 11 | + return (int64_t)t.tv_sec * 1000000000LL + (int64_t)t.tv_nsec;
|
|
| 11 | 12 | }
|
| 12 | 13 | |
| 13 | -int cpu_then_pause(long spin_ns) {
|
|
| 14 | - long deadline = now_ns() + spin_ns;
|
|
| 14 | +int cpu_then_pause(int64_t spin_ns) {
|
|
| 15 | + int64_t deadline = now_ns() + spin_ns;
|
|
| 15 | 16 | while (now_ns() < deadline) {
|
| 16 | 17 | }
|
| 17 | 18 | return pause();
|
| 1 | +{-# LANGUAGE InterruptibleFFI #-}
|
|
| 2 | +{-# LANGUAGE ScopedTypeVariables #-}
|
|
| 3 | + |
|
| 4 | +module Main where
|
|
| 5 | + |
|
| 6 | +import Control.Concurrent
|
|
| 7 | +import Control.Exception
|
|
| 8 | +import Foreign.C.Types
|
|
| 9 | +import System.IO
|
|
| 10 | +import System.Timeout
|
|
| 11 | + |
|
| 12 | +foreign import ccall interruptible "cpu_then_pause"
|
|
| 13 | + c_cpu_then_pause :: CLLong -> IO ()
|
|
| 14 | + |
|
| 15 | +-- Recommended pattern: under mask, with allowInterrupt as the explicit
|
|
| 16 | +-- interruption point. After a SIGPIPE-driven pause EINTR, control
|
|
| 17 | +-- returns to Haskell still under mask, so the queued throw isn't
|
|
| 18 | +-- delivered until allowInterrupt. For the kill to complete, the SIGPIPE
|
|
| 19 | +-- retry must fire when the initial SIGPIPE is lost in the user-space
|
|
| 20 | +-- prologue (the spin loop) โ otherwise pause blocks forever and
|
|
| 21 | +-- killThread hangs.
|
|
| 22 | +myCall :: CLLong -> IO ()
|
|
| 23 | +myCall spinNs = mask_ $ do
|
|
| 24 | + allowInterrupt
|
|
| 25 | + c_cpu_then_pause spinNs
|
|
| 26 | + allowInterrupt
|
|
| 27 | + |
|
| 28 | +main :: IO ()
|
|
| 29 | +main = do
|
|
| 30 | + hSetBuffering stdout NoBuffering
|
|
| 31 | + started <- newEmptyMVar
|
|
| 32 | + done <- newEmptyMVar
|
|
| 33 | + -- mask_ around the forkIOWithUnmask ensures the child inherits
|
|
| 34 | + -- MaskedInterruptible, so finally is set up before our killThread
|
|
| 35 | + -- below can be delivered.
|
|
| 36 | + tid <- mask_ $ forkIOWithUnmask $ \unmask ->
|
|
| 37 | + flip finally (putMVar done ()) $ do
|
|
| 38 | + putMVar started ()
|
|
| 39 | + unmask (myCall 2_000_000_000)
|
|
| 40 | + `catch` (\(_ :: SomeException) -> return ())
|
|
| 41 | + takeMVar started
|
|
| 42 | + threadDelay 200_000
|
|
| 43 | + result <- timeout 5_000_000 (killThread tid >> takeMVar done)
|
|
| 44 | + case result of
|
|
| 45 | + Just () -> putStrLn "ok"
|
|
| 46 | + Nothing -> do
|
|
| 47 | + hPutStrLn stderr "FAIL: killThread hung"
|
|
| 48 | + error "hung" |
| 1 | +{-# LANGUAGE InterruptibleFFI #-}
|
|
| 2 | +{-# LANGUAGE ScopedTypeVariables #-}
|
|
| 3 | + |
|
| 4 | +module Main where
|
|
| 5 | + |
|
| 6 | +import Control.Concurrent
|
|
| 7 | +import Control.Exception
|
|
| 8 | +import Control.Monad
|
|
| 9 | +import Data.IORef
|
|
| 10 | +import Foreign.C.Types
|
|
| 11 | +import System.Exit
|
|
| 12 | +import System.IO
|
|
| 13 | + |
|
| 14 | +foreign import ccall interruptible "pause" c_pause :: IO CInt
|
|
| 15 | + |
|
| 16 | +postRevokeWindow :: Int
|
|
| 17 | +postRevokeWindow = 2_000_000
|
|
| 18 | + |
|
| 19 | +postRevokeThreshold :: Int
|
|
| 20 | +postRevokeThreshold = 5
|
|
| 21 | + |
|
| 22 | +main :: IO ()
|
|
| 23 | +main = do
|
|
| 24 | + hSetBuffering stdout NoBuffering
|
|
| 25 | + iters <- newIORef (0 :: Int)
|
|
| 26 | + |
|
| 27 | + workerStarted <- newEmptyMVar
|
|
| 28 | + worker <- forkIO $ mask_ $ do
|
|
| 29 | + putMVar workerStarted ()
|
|
| 30 | + (forever $ do
|
|
| 31 | + atomicModifyIORef' iters (\n -> (n + 1, ()))
|
|
| 32 | + _ <- c_pause
|
|
| 33 | + return ())
|
|
| 34 | + `catch` (\(_ :: SomeException) -> return ())
|
|
| 35 | + |
|
| 36 | + takeMVar workerStarted
|
|
| 37 | + threadDelay 100_000
|
|
| 38 | + |
|
| 39 | + killer1Ready <- newEmptyMVar
|
|
| 40 | + killer1 <- forkIO $ do
|
|
| 41 | + putMVar killer1Ready ()
|
|
| 42 | + killThread worker
|
|
| 43 | + |
|
| 44 | + takeMVar killer1Ready
|
|
| 45 | + threadDelay 100_000
|
|
| 46 | + |
|
| 47 | + preRevoke <- readIORef iters
|
|
| 48 | + |
|
| 49 | + killThread killer1
|
|
| 50 | + |
|
| 51 | + threadDelay postRevokeWindow
|
|
| 52 | + |
|
| 53 | + postRevoke <- readIORef iters
|
|
| 54 | + let storm = postRevoke - preRevoke
|
|
| 55 | + if storm > postRevokeThreshold
|
|
| 56 | + then do
|
|
| 57 | + hPutStrLn stderr ("FAIL: post-revoke EINTR storm, iterations="
|
|
| 58 | + ++ show storm
|
|
| 59 | + ++ " (pre=" ++ show preRevoke
|
|
| 60 | + ++ " post=" ++ show postRevoke ++ ")")
|
|
| 61 | + exitFailure
|
|
| 62 | + else
|
|
| 63 | + putStrLn ("ok (post-revoke iterations=" ++ show storm ++ ")") |
| ... | ... | @@ -208,11 +208,24 @@ test('foreignInterruptible', [when(fast(), skip), |
| 208 | 208 | |
| 209 | 209 | test('T27113',
|
| 210 | 210 | [when(opsys('mingw32'), skip),
|
| 211 | + only_threaded_ways,
|
|
| 212 | + req_c],
|
|
| 213 | + compile_and_run, ['T27113_c.c'])
|
|
| 214 | + |
|
| 215 | +test('T27113b',
|
|
| 216 | + [extra_files(['T27113_c.c']),
|
|
| 217 | + when(opsys('mingw32'), skip),
|
|
| 211 | 218 | only_threaded_ways,
|
| 212 | 219 | req_c,
|
| 213 | - expect_broken(27113)],
|
|
| 220 | + ignore_stdout],
|
|
| 214 | 221 | compile_and_run, ['T27113_c.c'])
|
| 215 | 222 | |
| 223 | +test('T27113c',
|
|
| 224 | + [when(opsys('mingw32'), skip),
|
|
| 225 | + only_threaded_ways,
|
|
| 226 | + ignore_stdout],
|
|
| 227 | + compile_and_run, [''])
|
|
| 228 | + |
|
| 216 | 229 | test('conc037', only_ways(['threaded1', 'threaded2', 'nonmoving_thr']), compile_and_run, [''])
|
| 217 | 230 | test('conc038', only_ways(['threaded1', 'threaded2', 'nonmoving_thr']), compile_and_run, [''])
|
| 218 | 231 |