
Hi, I'm working on Concurrent Haskell, especially with the monad STM. I don't fully understand the way my program is executed. I think the lazy evaluation leads to a loss of performance, if we don't pay attention to this problem. A short example will be more explicit : Imagine this scenario : we have a set of threads (the workers) that have (each) a result to compute (purely). When finished, they try to save the result in an shared inbox, using STM. If the inbox is full, the thread waits until the inbox is empty. A specific thread is looking at the inbox: when it finds a value in the inbox, it prints the value on the screen (for example, it could be any processing based on the value) and then empty the inbox and wait that a remaining thread add a new value). Let's a function "do_job", the function to execute by the threads (the workers) : do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do { value <- return (f input) ; atomically ( writeMsg inbox value ) } The idea is : (f input) is the function to compute. Once compute, we want to save the result atomically. The problem is, because of the lazy evaluation, the "value" is computed in the atomic section, and not before, resulting in a loss of efficiency. Indeed, to be fast, a concurrent program has to keep the atomic sections as "small" as possible, because it limits the parallelism. To illustrate this, let's see this source code : module Main where import Control.Concurrent import Control.Concurrent.STM import Data.Maybe import System.Random import System.IO {-- Inbox --} type Inbox a = TVar (Maybe a) createInbox :: STM (Inbox a) createInbox = newTVar Nothing readMsg :: Inbox a -> STM a readMsg inbox = do { inboxContent <- readTVar inbox ; if (isNothing inboxContent) then retry else do { writeTVar inbox Nothing ; return (fromJust inboxContent) } } writeMsg :: Inbox a -> a -> STM () writeMsg inbox value = do { inboxContent <- readTVar inbox ; if (isNothing inboxContent) then writeTVar inbox (Just value) else retry } {-- Workers --} * do_job :: (b -> a) -> b -> Inbox a -> IO ()* *do_job f input inbox = do { value <- return (f input) * * ; atomically ( writeMsg inbox value ) }* do_jobs_in_threads :: [((b->a),b)] -> Inbox a -> TVar Int -> IO () do_jobs_in_threads [] _ _ = return () do_jobs_in_threads ((f,input):xs) inbox flag = do { forkIO_and_notify flag (do_job f input inbox) ; do_jobs_in_threads xs inbox flag } {-- Caller --} * * *caller :: Inbox a -> (a -> IO ()) -> Int -> IO ()* *caller _ _ 0 = return ()* *caller inbox process n = do { msg <- atomically (readMsg inbox)* * ; process msg * * ; caller inbox process (n-1) }* caller_in_thread flag inbox process n = forkIO_and_notify flag (caller inbox process n) {-- forkIO with notification --} create_flag = atomically ( newTVar 0 ) forkIO_and_notify :: TVar Int -> IO () -> IO () forkIO_and_notify tvar action = do { atomically ( do { oldValue <- readTVar tvar ; writeTVar tvar (oldValue + 1) } ) ; forkIO (do { action ; atomically ( do { oldValue <- readTVar tvar ; writeTVar tvar (oldValue - 1) } ) } ) --; putStrLn "Tread lancé" } ; return () } waitFlag flag = atomically ( do { valueflag <- readTVar flag ; if valueflag > 0 then retry else return () } ) {-- main --} main :: IO () main = do { flag <- create_flag ; inbox <- atomically (createInbox) ; *caller_in_thread flag inbox (\x -> putStrLn ("Caller : "++ (show (x)))) 3* * ; do_jobs_in_threads [(perm,[1..11]),(perm,[1..8]),(perm,[1..3])] inbox flag* ; waitFlag flag ; return () } where perm (l:ls) = injectett l (perm ls) injectett x (l:ls) = injecte x l ++ injectett x ls injecte x (l:ls) = [x:l:ls]++map (l:) (injecte x ls) inputs = zip (replicate 3 f) [[1..11],[1..8],[1..3]] As you can see, we ask for 3 threads to compute permutations for [1..11], [1..8] and [1..3]. The "Caller" write a message when a thread finished. What we expect is that the second and third thread finish their work before the first one. But the output of this program is : *Caller : 39916800* *Caller : 40320* *Caller : 6* ... which means that threads 2 and 3 have to wait the first thread before being able to save (and probably compute) their own result. If I force to evaluate "value" before the atomic section, by defining : do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do { value <- return (f input) ; value `seq` atomically ( writeMsg inbox value ) } Then I obtain a more efficient program, as the output confirms : *Caller : 6* *Caller : 40320* *Caller : 39916800* That's what we want, but what is the explanation of this behavior? STM is designed to be optimistic, not blocking. So, does it means that the "value" is evaluated at "commit-time"? Do you know some problems that are related or do you know some works that can be useful at this subject? Thanks for your help, rde