Non-strict evaluation and concurrency (STM) : conflict?

Hi, I'm working on Concurrent Haskell, especially with the monad STM. I don't fully understand the way my program is executed. I think the lazy evaluation leads to a loss of performance, if we don't pay attention to this problem. A short example will be more explicit : Imagine this scenario : we have a set of threads (the workers) that have (each) a result to compute (purely). When finished, they try to save the result in an shared inbox, using STM. If the inbox is full, the thread waits until the inbox is empty. A specific thread is looking at the inbox: when it finds a value in the inbox, it prints the value on the screen (for example, it could be any processing based on the value) and then empty the inbox and wait that a remaining thread add a new value). Let's a function "do_job", the function to execute by the threads (the workers) : do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do { value <- return (f input) ; atomically ( writeMsg inbox value ) } The idea is : (f input) is the function to compute. Once compute, we want to save the result atomically. The problem is, because of the lazy evaluation, the "value" is computed in the atomic section, and not before, resulting in a loss of efficiency. Indeed, to be fast, a concurrent program has to keep the atomic sections as "small" as possible, because it limits the parallelism. To illustrate this, let's see this source code : module Main where import Control.Concurrent import Control.Concurrent.STM import Data.Maybe import System.Random import System.IO {-- Inbox --} type Inbox a = TVar (Maybe a) createInbox :: STM (Inbox a) createInbox = newTVar Nothing readMsg :: Inbox a -> STM a readMsg inbox = do { inboxContent <- readTVar inbox ; if (isNothing inboxContent) then retry else do { writeTVar inbox Nothing ; return (fromJust inboxContent) } } writeMsg :: Inbox a -> a -> STM () writeMsg inbox value = do { inboxContent <- readTVar inbox ; if (isNothing inboxContent) then writeTVar inbox (Just value) else retry } {-- Workers --} * do_job :: (b -> a) -> b -> Inbox a -> IO ()* *do_job f input inbox = do { value <- return (f input) * * ; atomically ( writeMsg inbox value ) }* do_jobs_in_threads :: [((b->a),b)] -> Inbox a -> TVar Int -> IO () do_jobs_in_threads [] _ _ = return () do_jobs_in_threads ((f,input):xs) inbox flag = do { forkIO_and_notify flag (do_job f input inbox) ; do_jobs_in_threads xs inbox flag } {-- Caller --} * * *caller :: Inbox a -> (a -> IO ()) -> Int -> IO ()* *caller _ _ 0 = return ()* *caller inbox process n = do { msg <- atomically (readMsg inbox)* * ; process msg * * ; caller inbox process (n-1) }* caller_in_thread flag inbox process n = forkIO_and_notify flag (caller inbox process n) {-- forkIO with notification --} create_flag = atomically ( newTVar 0 ) forkIO_and_notify :: TVar Int -> IO () -> IO () forkIO_and_notify tvar action = do { atomically ( do { oldValue <- readTVar tvar ; writeTVar tvar (oldValue + 1) } ) ; forkIO (do { action ; atomically ( do { oldValue <- readTVar tvar ; writeTVar tvar (oldValue - 1) } ) } ) --; putStrLn "Tread lancé" } ; return () } waitFlag flag = atomically ( do { valueflag <- readTVar flag ; if valueflag > 0 then retry else return () } ) {-- main --} main :: IO () main = do { flag <- create_flag ; inbox <- atomically (createInbox) ; *caller_in_thread flag inbox (\x -> putStrLn ("Caller : "++ (show (x)))) 3* * ; do_jobs_in_threads [(perm,[1..11]),(perm,[1..8]),(perm,[1..3])] inbox flag* ; waitFlag flag ; return () } where perm (l:ls) = injectett l (perm ls) injectett x (l:ls) = injecte x l ++ injectett x ls injecte x (l:ls) = [x:l:ls]++map (l:) (injecte x ls) inputs = zip (replicate 3 f) [[1..11],[1..8],[1..3]] As you can see, we ask for 3 threads to compute permutations for [1..11], [1..8] and [1..3]. The "Caller" write a message when a thread finished. What we expect is that the second and third thread finish their work before the first one. But the output of this program is : *Caller : 39916800* *Caller : 40320* *Caller : 6* ... which means that threads 2 and 3 have to wait the first thread before being able to save (and probably compute) their own result. If I force to evaluate "value" before the atomic section, by defining : do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do { value <- return (f input) ; value `seq` atomically ( writeMsg inbox value ) } Then I obtain a more efficient program, as the output confirms : *Caller : 6* *Caller : 40320* *Caller : 39916800* That's what we want, but what is the explanation of this behavior? STM is designed to be optimistic, not blocking. So, does it means that the "value" is evaluated at "commit-time"? Do you know some problems that are related or do you know some works that can be useful at this subject? Thanks for your help, rde

Excerpts from Romain Demeyer's message of Tue Sep 28 09:06:53 -0400 2010:
That's what we want, but what is the explanation of this behavior? STM is designed to be optimistic, not blocking. So, does it means that the "value" is evaluated at "commit-time"? Do you know some problems that are related or do you know some works that can be useful at this subject?
STM's interface is lazy; so you have to tell Haskell when to evaluate computationally expensive thunks, otherwise it will wait till the last possible moment. This is a common trip up point for concurrency programming. Don Stewart wrote strict-concurrency, which is a strict version of MVars and Chans, though I don't see a strict STM class. Edward

On Tue, Sep 28, 2010 at 10:06 AM, Romain Demeyer
Let's a function "do_job", the function to execute by the threads (the workers) : do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do { value <- return (f input) ; atomically ( writeMsg inbox value ) }
First of all, note that do v <- return (f x) ... is exactly the same as do let v = f x ... Now, if you want to evaluate your value to WHNF (Weak Head Normal Formal), you may use
do_job :: (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = do value <- evaluate (f input) atomically (writeMsg inbox value)
This will work pretty well if your value is simple (eg. an Int) but not so well if it is complex (eg. a Data.Map) because it will evaluate only as much as 'seq'. You may than use the 'deepseq' package:
import Control.DeepSeq
do_job :: NFData a => (b -> a) -> b -> Inbox a -> IO () do_job f input inbox = let value = f input in value `deepseq` atomically (writeMsg inbox value)
This will fully evaluate the structure before calling 'writeMsg'.
That's what we want, but what is the explanation of this behavior? STM is designed to be optimistic, not blocking. So, does it means that the "value" is evaluated at "commit-time"? Do you know some problems that are related or do you know some works that can be useful at this subject?
Those values are pure, so if you say writeMsg inbox (f x) then internally a thunk is created referencing 'f' and 'x', and a pointer to that thunk is atomically commited. Just like the rest of the program. The value is not being evaluated by STM at all, as your STM functions don't need the value. In your program is evaluating when you print the answer in the main thread, as printing requires the value of the computation. If you didn't print, nothing would be computed at all. Lazy =). HTH, -- Felipe.

Thanks for your help. It's more clear now.
The value is not being evaluated by STM at all, as your STM functions don't need the value. In your program is evaluating when you print the answer in the main thread, as printing requires the value of the computation. If you didn't print, nothing would be computed at all. Lazy =).
Does it means that the value is computed *by* the "caller", based on the thunk, and not by the worker itself? In this case, in this specific example, it would mean that this program does not exploit the parallelism at all (except using deepseq or seq). I understand the principles (lazy evaluation, thunk,...) , but I'm surprised that no work has been done to "solve" this problem (in the sense that it's not intuitive to write concurrent programs in this context).

On Tue, Sep 28, 2010 at 11:38 AM, Romain Demeyer
Does it means that the value is computed by the "caller", based on the thunk, and not by the worker itself?
It is computed by the one who needs the value. Your worker doesn't. Note that the value is computed on 'print', which is *after* the worker has returned the value. So, it is the caller who evaluates, but it does not evaluate while calling the worker.
In this case, in this specific example, it would mean that this program does not exploit the parallelism at all (except using deepseq or seq). I understand the principles (lazy evaluation, thunk,...) , but I'm surprised that no work has been done to "solve" this problem (in the sense that it's not intuitive to write concurrent programs in this context).
It is not a problem, it is a feature. It is our beloved lazy evaluation applied to STM. Alas, there isn't a single "solution". You may want 'seq', 'deepseq' or something between. Your data may already be in WHNF or HNF, so calling 'seq' or 'deepseq' always would decrease performance for no gain on these cases. And sometimes you really want the lazy behaviour, for example, if you were using an infinite data structure. It is only a problem when you are learning how to use concurrency or parallelism in Haskell. Just repeat to yourself that everything is lazy and you'll get used to it =). Cheers, -- Felipe.

Romain Demeyer wrote:
Imagine this scenario : we have a set of threads (the workers) that have (each) a result to compute (purely). When finished, they try to save the result in an shared inbox, using STM. If the inbox is full, the thread waits until the inbox is empty. A specific thread is looking at the inbox: when it finds a value in the inbox, it prints the value on the screen (for example, it could be any processing based on the value) and then empty the inbox and wait that a remaining thread add a new value).
The technical reason this does not work as expected was given by others (lazy evaluation, etc). One additional remark: It seems you are using STM for parallelism, i.e. to enhance performance, not for explicit concurrency. (Otherwise it would make no difference to you which thread actually evaluates some expression). This can be done much easier with the `par` combinator and friends (http://hackage.haskell.org/package/parallel). The same caveat wrt lazyness applies to this method, but your code will become a lot simpler: no need to explicitly manage threads, pure functional (non-monadic) code. Cheers Ben
participants (4)
-
Ben Franksen
-
Edward Z. Yang
-
Felipe Lessa
-
Romain Demeyer