
That's a really good idea. Thanks for the insight :) On 31 May 2010 19:33, Dean Herington & Elizabeth Lacey < heringtonlacey@mindspring.com> wrote:
At 10:06 AM +0100 5/25/10, Benjamin Edwards wrote:
NB: This was posted in fa.haskell first, I guess it was the wrong forum for this kind of question as it was left unanswered :)
Hi,
I'm having a few issues getting some toy programs to work whilst I try to get a better understanding of how to model processes and channels. I am just trying to use the real base blocks and failing miserably. Here is an example (yes this is utterly contrived and sill, but I lack imagination... sue me):
I want my main thread to do the following:
1. make a channel 2. spawn a thread (producer) that will write a series of lists of integers to the the channel, then exit. 3. spawn another thread that will read from the channel and sum all of the input. It should exit when both the channel is empty and and the producer thread has finished writing to it. 4. Main thread should print the sum.
My current code should uses a trick I have seen else where which is to have the result of "task" running in the thread put into an MVar. So my condition for the reading thread exiting is to check if the MVar of the producer thread is not empty and if the channel is empty. If those two things are true, exit the thread. Unfortunately if somehow seems able to to get to a stage where the produce thread has finished and the channel is empty, but is blocking on a read.
I have the following code, but it always blocks indefinitely on a read. I am sure there is something obviously deficient with it, but I can't work out what it is. Any help would be greatly appreciated. Of course, if I'm doing it all wrong, please tell me that too :)
module Main where
import Control.Concurrent import Control.Concurrent.STM import Control.Monad (forever) import Data.Map as M
main :: IO () main = do oc <- newChan counter <- newTVarIO (0 :: Integer) p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ loop oc p counter takeMVar c >>= print
produce :: Chan [Integer] -> [Integer] -> IO () produce ch [] = return () produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch hs produce ch ts
loop :: Chan [Integer] -> MVar () -> TVar Integer -> IO Integer loop ch p n = do f <- isEmptyMVar p e <- isEmptyChan ch if e && (not f) then atomically (readTVar n) else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n
forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv
By encoding end-of-data directly in the channel contents, you can simplify the code (and make it less prone to hangs such as the one you experienced.) I've shown one way to do this below. I've also made the accumulating count a simple parameter to the consumer function. (Because the count is private to the consumer until it's passed to the main routine via the consumer's termination MVar, there's no need for additional inter-thread synchronization.)
Dean
module Main where
import Control.Concurrent
main :: IO () main = do oc <- newChan p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ consume oc 0 takeMVar c >>= print
produce :: Chan (Maybe [Integer]) -> [Integer] -> IO () produce ch [] = writeChan ch Nothing
produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch (Just hs) produce ch ts
consume :: Chan (Maybe [Integer]) -> Integer -> IO Integer consume ch cnt = do mbInts <- readChan ch case mbInts of Just xs -> consume ch (cnt + sum xs) Nothing -> return cnt
forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv
participants (1)
-
Benjamin Edwards