
Greeting, Something is not working for me, and I could use some more eyes on this. What I'm trying to accomplish is to implement a simpel barrier for ten worker threads (id = 0..9) using STM. With or without the barrier, the program produces an unordered interleaving of the output from the workers. Here's what I get with the program below: $ ghc --make Main.lhs $ a.out 0134568027913457896012579026813423904671238455702468159367839684012570279134685049137825901642375689134057892610462578903156012389473268457910267801345923924567801304689235714013679458256702465913878... And here's what I get without the line "atomically $ barrier tv id": $ a.out 1249056782934567210845619720538461975203698469175203469850123485076912348579406123894625738942106381592740631859274163841092315768491302578416930728254169302785693024917853029640217390856490... The first run should've been something like: 012345678901234567890123456789012345... since each worker thread 0..9 should write its id out once per iteration, and the workers should iterate in sync. Here's the code:
module Main where
import Control.Concurrent import Control.Concurrent.STM import System.Random
worker :: Int -> TVar Int -> IO () worker id tv = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id atomically $ barrier tv id worker id tv
Each worker sleeps for some time, then outputs its id and waits at the barrier for all the other workers to finish their sleep+output.
barrier :: TVar Int -> Int -> STM () barrier tv id = do passed <- readTVar tv if (passed `mod` 10 == id) then writeTVar tv (passed+1) else retry
The barrier is simply a global variable, tv, which holds the number of times any worker passed the barrier. Now, a worker may only pass the barrier iff the worker with an id one less just passed, or else it should block.
main :: IO () main = do tv <- atomically $ newTVar 0 for [0..9] $ \i -> forkIO $ worker i tv threadDelay (10*10^6)
for = flip mapM_
The main thread just initializes the pass counter, starts 10 worker threads, and waits for ten seconds. I'd like to hear some comments on the approach, and perhaps even some insight into why it doesn't work. Regards, Peter Eriksen

On Fri, Dec 16, 2005 at 05:46:33PM +0100, Peter Eriksen wrote:
Here's the code:
module Main where
import Control.Concurrent import Control.Concurrent.STM import System.Random
worker :: Int -> TVar Int -> IO () worker id tv = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id atomically $ barrier tv id worker id tv
You've got the barrier after the putStr, so there's nothing to make the first ten putStrs be in order. I think you need a non-updating barrier before the putStr and then an updating function after the putStr (to tell the next worker that it is free to print).
barrier :: TVar Int -> Int -> STM () barrier tv id = do passed <- readTVar tv if (passed `mod` 10 == id) then writeTVar tv (passed+1) else retry
move_along :: TVar Int -> Int -> STM () barrier tv id = do passed <- readTVar tv writeTVar tv (passed+1)
worker :: Int -> TVar Int -> IO () worker id tv = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime atomically $ barrier tv id putStr $ show id atomically $ move_along tv id worker id tv -- David Roundy

On 12/16/05, Peter Eriksen
Greeting,
Something is not working for me, and I could use some more eyes on this. What I'm trying to accomplish is to implement a simpel barrier for ten worker threads (id = 0..9) using STM. With or without the barrier, the program produces an unordered interleaving of the output from the workers. Here's what I get with the program below:
$ ghc --make Main.lhs $ a.out 0134568027913457896012579026813423904671238455702468159367839684012570279134685049137825901642375689134057892610462578903156012389473268457910267801345923924567801304689235714013679458256702465913878...
And here's what I get without the line "atomically $ barrier tv id":
$ a.out 1249056782934567210845619720538461975203698469175203469850123485076912348579406123894625738942106381592740631859274163841092315768491302578416930728254169302785693024917853029640217390856490...
The first run should've been something like: 012345678901234567890123456789012345... since each worker thread 0..9 should write its id out once per iteration, and the workers should iterate in sync.
Here's the code:
module Main where
import Control.Concurrent import Control.Concurrent.STM import System.Random
worker :: Int -> TVar Int -> IO () worker id tv = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id
You're printing the ID after a random sleep. Shouldn't be a big surprise that the output will be shuffled.
atomically $ barrier tv id
If you move 'putStr $ show id' down below the barrier then it'll behave like you want it to.
worker id tv
Each worker sleeps for some time, then outputs its id and waits at the barrier for all the other workers to finish their sleep+output.
barrier :: TVar Int -> Int -> STM () barrier tv id = do passed <- readTVar tv if (passed `mod` 10 == id) then writeTVar tv (passed+1) else retry
The barrier is simply a global variable, tv, which holds the number of times any worker passed the barrier. Now, a worker may only pass the barrier iff the worker with an id one less just passed, or else it should block.
main :: IO () main = do tv <- atomically $ newTVar 0 for [0..9] $ \i -> forkIO $ worker i tv threadDelay (10*10^6)
for = flip mapM_
-- Friendly, Lemmih

On Fri, Dec 16, 2005 at 06:25:00PM +0100, Lemmih wrote:
If you move 'putStr $ show id' down below the barrier then it'll behave like you want it to.
However, the printed sequence may sometimes differ from expected because of races. Best regards Tomasz -- I am searching for a programmer who is good at least in some of [Haskell, ML, C++, Linux, FreeBSD, math] for work in Warsaw, Poland

On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
On 12/16/05, Peter Eriksen
wrote: threadDelay (10*10^6)
10*10^6 == 10e6, btw.
10e7.
-- Friendly, Lemmih _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On 12/16/05, Marcin Tustin
On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
On 12/16/05, Peter Eriksen
wrote: threadDelay (10*10^6)
10*10^6 == 10e6, btw.
10e7.
Prelude> 10*10^6 == 10e6 True Prelude> 10*10^6 == 10e7 False 10*10^6 == 1.0e7. -- Friendly, Lemmih

On Fri, Dec 16, 2005 at 07:02:03PM +0100, Lemmih wrote:
On 12/16/05, Marcin Tustin
wrote: On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
On 12/16/05, Peter Eriksen
wrote: threadDelay (10*10^6)
10*10^6 == 10e6, btw.
10e7.
Prelude> 10*10^6 == 10e6 True Prelude> 10*10^6 == 10e7 False
10*10^6 == 1.0e7.
Err yes, that's obviously correct. I think I must have misread "10e6" as "1e6", and then added to the superscript. D'oh.
-- Friendly, Lemmih

Lemmih wrote:
On 12/16/05, Peter Eriksen
wrote: threadDelay (10*10^6)
10*10^6 == 10e6, btw.
But the types are different. For sake of completeness: (10*10^6, 10*10^^6, 10*10**6, 10e6) :: (Num a, Fractional b, Floating c, Fractional d) => (a, b, c, d) threadDelay wants an Int, so 10e6 won't work. Bertram

Hi again, Now I've actually tested the barrier implementation by counting the number of times each worker thread reaches the barrier. It's not a proof, but I take it as strong indication, that it's not as bad, as I first thought. If all workers have run the same number of times (that is a maximum of one apart), then at least that's one good feature of the barrier. I think it works though and also keeps that invariant (max one iteration apart) all the time. Thank you for your kind help. Regards, Peter **************** Here are the counts for runs with the barrier in different places: ============= No barrier:
worker :: Int -> TVar Int -> TVar Int -> IO () worker id tv ic = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id atomically $ (inc ic) worker id tv ic
(0,274) (1,272) (2,274) (3,278) (4,269) (5,287) (6,287) (7,275) (8,281) (9,274) ================ The barrier after putStr:
worker :: Int -> TVar Int -> TVar Int -> IO () worker id tv ic = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id atomically $ barrier tv id atomically $ (inc ic) worker id tv ic
(0,199) (1,199) (2,199) (3,199) (4,198) (5,198) (6,198) (7,198) (8,198) (9,198) =============== The thread between threadDelay and putStr:
worker :: Int -> TVar Int -> TVar Int -> IO () worker id tv ic = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime atomically $ barrier tv id putStr $ show id atomically $ (inc ic) worker id tv ic
(0,202) (1,201) (2,201) (3,201) (4,201) (5,201) (6,201) (7,201) (8,201) (9,201) Note: This is the one looking most like 0123456789012345... as I initially wanted, but of course there is a chance of a race where all worker threads wait before putStr after they are in sequence from the barrier. Then it would be random which one executed putStr first. =============== The barrier is placed in the beginning before threadDelay:
worker :: Int -> TVar Int -> TVar Int -> IO () worker id tv ic = do sleepingTime <- randomRIO (0, 50000) atomically $ barrier tv id threadDelay sleepingTime putStr $ show id atomically $ (inc ic) worker id tv ic
(0,200) (1,200) (2,200) (3,200) (4,200) (5,199) (6,200) (7,200) (8,199) (9,199) ======================= ======================= Here's the full program:
module Main where
import Control.Concurrent import Control.Concurrent.STM import System.Random
worker :: Int -> TVar Int -> TVar Int -> IO () worker id tv ic = do sleepingTime <- randomRIO (0, 50000) threadDelay sleepingTime putStr $ show id atomically $ barrier tv id atomically $ (inc ic) worker id tv ic
Each worker sleeps for some time, then outputs its id and waits at the barrier for all the other workers to finish their sleep+output.
barrier :: TVar Int -> Int -> STM () barrier tv id = do passed <- readTVar tv if (passed `mod` 10 == id) then writeTVar tv (passed+1) else retry
The barrier is simply a global variable, tv, which holds the number of times any worker passed the barrier. Now, a worker may only pass the barrier iff the worker with an id one less just passed, or else it should block.
main :: IO () main = do tv <- atomically $ newTVar 0 idCounts <- mapM (atomically . newTVar) [0,0,0,0,0,0,0,0,0,0] for [0..9] $ \i -> forkIO $ worker i tv (idCounts!!i) threadDelay (10*10^6) mapM_ (\(i,ic) -> (atomically $ readTVar ic) >>= \n -> print (i,n)) (zip [0..9] idCounts)
for = flip mapM_
inc tvar = readTVar tvar >>= \n -> writeTVar tvar (n+1)
participants (6)
-
Bertram Felgenhauer
-
David Roundy
-
Lemmih
-
Marcin Tustin
-
Peter Eriksen
-
Tomasz Zielonka