
I would use bounded STM channels (from the stm-chans package) for communication; this would keep the producer from getting too far ahead of the converters. You'd need to tag items as they're produced (an Integer should be fine) also, and keep track of the tags. A TVar should suffice for that. The basic outline is that the producer writes to a channel. Each converter thread reads from that channel, and when it's finished, checks the output index TVar. If the converter's item index is equal to the current output index,the converter puts its value into an output channel and increments the output index. A final consumer reads from the output channel and processes each item in turn. Or instead of a bounded input channel, the producer could write to a TMVar. Which is better probably depends on the details of your production pattern. You certainly could use something like iteratee-stm or the conduits variant, but they wouldn't directly help with concurrency of converters, nor with synchronization. What they would give you is concurrency between the producer, converter, and consumer. Of course you could build your own converter step to work within that framework. John L.
Date: Mon, 13 Feb 2012 16:12:22 +0100 From: Roel van Dijk
Subject: [Haskell-cafe] How to increase performance using concurrency for sequential producer-consumer problem To: Haskell Caf? Message-ID: Content-Type: text/plain; charset=UTF-8 Hello,
I have a program which I believe can benefit from concurrency. But I am wondering if the mechanisms I need already exist somewhere on Hackage.
Here is a sketch of my program, in literate Haskell:
module Problem where import Control.Monad ( forM_ )
The producer produces values. It blocks until there are now more values to produce. Each value is given to a callback function.
type Producer a = (a -> IO ()) -> IO ()
The converter does some work with a value. This work is purely CPU and it is the bottleneck of the program. The amount of work it has to do is variable.
type Converter a b = a -> b
The consumer does something with the value calculated by the converter. It is very important that the consumer consumes the values in the same order as they are produced.
type Consumer b = b -> IO ()
Dummy producer, converter and consumer:
producer :: Producer Int producer callback = forM_ [1..10] callback
converter :: Converter Int Int converter = (*10)
consumer :: Consumer Int consumer = print
A simple driver. Does not exploit concurrency.
simpleDriver :: Producer a -> Converter a b -> Consumer b -> IO () simpleDriver producer converter consumer = producer (consumer . converter)
current_situation :: IO () current_situation = simpleDriver producer converter consumer
Ideally I would like a driver that spawns a worker thread for each core in my system. But the trick is in ensuring that the consumer is offered results in the same order as they are generated by the producer.
I can envision that some kind of storage is necessary to keep track of results which can not yet be offered to the consumer because it is still waiting for an earlier result.
Is there any package on Haskell that can help me with this problem? Or do I have to implement it using lower level concurrency primitives?
Regards, Roel