Learning about channels

NB: This was posted in fa.haskell first, I guess it was the wrong forum for this kind of question as it was left unanswered :) Hi, I'm having a few issues getting some toy programs to work whilst I try to get a better understanding of how to model processes and channels. I am just trying to use the real base blocks and failing miserably. Here is an example (yes this is utterly contrived and sill, but I lack imagination... sue me): I want my main thread to do the following: 1. make a channel 2. spawn a thread (producer) that will write a series of lists of integers to the the channel, then exit. 3. spawn another thread that will read from the channel and sum all of the input. It should exit when both the channel is empty and and the producer thread has finished writing to it. 4. Main thread should print the sum. My current code should uses a trick I have seen else where which is to have the result of "task" running in the thread put into an MVar. So my condition for the reading thread exiting is to check if the MVar of the producer thread is not empty and if the channel is empty. If those two things are true, exit the thread. Unfortunately if somehow seems able to to get to a stage where the produce thread has finished and the channel is empty, but is blocking on a read. I have the following code, but it always blocks indefinitely on a read. I am sure there is something obviously deficient with it, but I can't work out what it is. Any help would be greatly appreciated. Of course, if I'm doing it all wrong, please tell me that too :) module Main where import Control.Concurrent import Control.Concurrent.STM import Control.Monad (forever) import Data.Map as M main :: IO () main = do oc <- newChan counter <- newTVarIO (0 :: Integer) p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ loop oc p counter takeMVar c >>= print produce :: Chan [Integer] -> [Integer] -> IO () produce ch [] = return () produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch hs produce ch ts loop :: Chan [Integer] -> MVar () -> TVar Integer -> IO Integer loop ch p n = do f <- isEmptyMVar p e <- isEmptyChan ch if e && (not f) then atomically (readTVar n) else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv

On Tuesday 25 May 2010 11:06:48, Benjamin Edwards wrote:
NB: This was posted in fa.haskell first, I guess it was the wrong forum for this kind of question as it was left unanswered :)
Hi,
I'm having a few issues getting some toy programs to work whilst I try to get a better understanding of how to model processes and channels. I am just trying to use the real base blocks and failing miserably. Here is an example (yes this is utterly contrived and sill, but I lack imagination... sue me):
I want my main thread to do the following:
1. make a channel 2. spawn a thread (producer) that will write a series of lists of integers to the the channel, then exit. 3. spawn another thread that will read from the channel and sum all of the input. It should exit when both the channel is empty and and the producer thread has finished writing to it. 4. Main thread should print the sum.
My current code should uses a trick I have seen else where which is to have the result of "task" running in the thread put into an MVar.
That's good.
So my condition for the reading thread exiting is to check if the MVar of the producer thread is not empty and if the channel is empty. If those two things are true, exit the thread. Unfortunately if somehow seems able to to get to a stage where the produce thread has finished and the channel is empty, but is blocking on a read.
I think it gets to the state where the channel is empty but the produce thread hasn't finished yet.
I have the following code, but it always blocks indefinitely on a read. I am sure there is something obviously deficient with it, but I can't work out what it is. Any help would be greatly appreciated. Of course, if I'm doing it all wrong, please tell me that too :)
module Main where
import Control.Concurrent import Control.Concurrent.STM import Control.Monad (forever) import Data.Map as M
main :: IO () main = do oc <- newChan counter <- newTVarIO (0 :: Integer) p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ loop oc p counter takeMVar c >>= print
produce :: Chan [Integer] -> [Integer] -> IO () produce ch [] = return () produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch hs produce ch ts
loop :: Chan [Integer] -> MVar () -> TVar Integer -> IO Integer loop ch p n = do f <- isEmptyMVar p e <- isEmptyChan ch if e && (not f) then atomically (readTVar n)
Sorry for the ugly layout: else do if e then yield else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n The point is, if the channel is empty, but the producer has not yet finished, don't try to read from the channel (that wouldn't work then), but give the producer the chance to produce the next chunk. Since thread-switching happens on allocation, don't just jump to the next iteration of the loop, but tell the thread manager "I have nothing to do at the moment, you can let somebody else run for a while". I have encountered cases where yield didn't work reliably (no idea whether that was my fault or the compiler's, but "threadDelay 0" instead of yield worked reliably).
else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n
forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv

Sorry for the ugly layout:
else do if e then yield else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n
The point is, if the channel is empty, but the producer has not yet finished, don't try to read from the channel (that wouldn't work then), but give the producer the chance to produce the next chunk. Since thread-switching happens on allocation, don't just jump to the next iteration of the loop, but tell the thread manager "I have nothing to do at the moment, you can let somebody else run for a while".
I have encountered cases where yield didn't work reliably (no idea whether that was my fault or the compiler's, but "threadDelay 0" instead of yield worked reliably).
This is where I was getting it all horribly wrong. I assumed that while
On 25 May 2010 11:00, Daniel Fischer

Having read thishttp://haskell.org/ghc/docs/6.12.1/html/libraries/base-4.2.0.0/Control-Concu...page a bit more I think I understand why the prgram was blocking, but if I compile with -threaded surely the readChan function shouldn't prevent the producer from producing?

On Tuesday 25 May 2010 12:35:38, Benjamin Edwards wrote:
Having read this<http://haskell.org/ghc/docs/6.12.1/html/libraries/base-4.2.0.0/Cont rol-Concurrent.html>page a bit more I think I understand why the prgram was blocking, but if I compile with -threaded surely the readChan function shouldn't prevent the producer from producing?
I'm afraid that is not so, "The downside of having lightweight threads is that only one can run at a time" and I compiled the original with -threaded and got: $ ./chanTest +RTS -N2 chanTest: thread blocked indefinitely in an MVar operation

At 10:06 AM +0100 5/25/10, Benjamin Edwards wrote:
NB: This was posted in fa.haskell first, I guess it was the wrong forum for this kind of question as it was left unanswered :)
Hi,
I'm having a few issues getting some toy programs to work whilst I try to get a better understanding of how to model processes and channels. I am just trying to use the real base blocks and failing miserably. Here is an example (yes this is utterly contrived and sill, but I lack imagination... sue me):
I want my main thread to do the following:
1. make a channel 2. spawn a thread (producer) that will write a series of lists of integers to the the channel, then exit. 3. spawn another thread that will read from the channel and sum all of the input. It should exit when both the channel is empty and and the producer thread has finished writing to it. 4. Main thread should print the sum.
My current code should uses a trick I have seen else where which is to have the result of "task" running in the thread put into an MVar. So my condition for the reading thread exiting is to check if the MVar of the producer thread is not empty and if the channel is empty. If those two things are true, exit the thread. Unfortunately if somehow seems able to to get to a stage where the produce thread has finished and the channel is empty, but is blocking on a read.
I have the following code, but it always blocks indefinitely on a read. I am sure there is something obviously deficient with it, but I can't work out what it is. Any help would be greatly appreciated. Of course, if I'm doing it all wrong, please tell me that too :)
module Main where
import Control.Concurrent import Control.Concurrent.STM import Control.Monad (forever) import Data.Map as M
main :: IO () main = do oc <- newChan counter <- newTVarIO (0 :: Integer) p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ loop oc p counter takeMVar c >>= print
produce :: Chan [Integer] -> [Integer] -> IO () produce ch [] = return () produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch hs produce ch ts
loop :: Chan [Integer] -> MVar () -> TVar Integer -> IO Integer loop ch p n = do f <- isEmptyMVar p e <- isEmptyChan ch if e && (not f) then atomically (readTVar n) else do xs <- readChan ch atomically $ do x <- readTVar n writeTVar n (x + sum xs) loop ch p n
forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv
By encoding end-of-data directly in the channel contents, you can simplify the code (and make it less prone to hangs such as the one you experienced.) I've shown one way to do this below. I've also made the accumulating count a simple parameter to the consumer function. (Because the count is private to the consumer until it's passed to the main routine via the consumer's termination MVar, there's no need for additional inter-thread synchronization.) Dean module Main where import Control.Concurrent main :: IO () main = do oc <- newChan p <- forkJoin $ produce oc [1..1000] c <- forkJoin $ consume oc 0 takeMVar c >>= print produce :: Chan (Maybe [Integer]) -> [Integer] -> IO () produce ch [] = writeChan ch Nothing produce ch xs = do let (hs,ts) = splitAt 100 xs writeChan ch (Just hs) produce ch ts consume :: Chan (Maybe [Integer]) -> Integer -> IO Integer consume ch cnt = do mbInts <- readChan ch case mbInts of Just xs -> consume ch (cnt + sum xs) Nothing -> return cnt forkJoin :: IO a -> IO (MVar a) forkJoin task = do mv <- newEmptyMVar forkIO (task >>= putMVar mv) return mv

I'm sorry if this matter has already been discussed, but I'm going nuts here. Attached is the code for a small program, an ubber simplification of something I'm trying to do which would enormously gain from lazy serialization. The code, however, is broken... It runs, and does the job, but it does so strictly. It's more a self imposed exercise than anything else, but I'd really like to understand what's going on with this snippet, why it didn't worked as I thought it would. The objective is to read a binary file, checking to see if a particular bit (bit zero) is set or not. My idea was to use the Get monad to get one Word8 at a time, do the check, and cons the True/False result of that check with a "results list". The reason for this results list lies in that I'll later read through this results, and it would be great if I could do so lazily, aiming for the producer-consumer pattern. The As you'll see, my code fails to produce the results list lazily. At first I thought that the list would only escape the Get monad if fully evaluated. So I added the 'testIn' function, which offers only the head of that list, running inside the Get monad... but even this triggers the full traversal of the file. I've attempted several combinations of "let", trying to induce laziness, but always to no avail. I am at a loss. Any help is most welcomed. --- BEGIN CODE --- import Data.Bits (testBit) import Data.Word import System.IO (openBinaryFile, withBinaryFile, IOMode(..)) import Data.Binary.Get import qualified Data.ByteString.Lazy as B import Control.Monad (liftM, liftM2) -- | Check the LSB in a word against the symbol. check :: Bool -> Word8 -> Bool {-# INLINE check #-} check s w = testBit w 0 == s -- Algorithm to implement: -- - get a word from lazy buffer. -- - check whether 'least/most' significant byte is as expected. -- - cons result in output buffer. -- The result contains a stream of "checks". checker :: Bool -> Get Bool checker s = getWord8 >>= return . check s go :: Symbol -> Get [Bool] go s = do eof <- isEmpty case eof of True -> return [] False -> let res = liftM2 (:) (checker s) (go s) in res -- -- Work inside the Get monad. -- | return the head of the results... this shouldn't take long! testIn :: Get Bool testIn = liftM (head) (go True) -- -- -- gimmi only the head of the results list runnerIn :: IO Bool runnerIn = openBinaryFile testFile ReadMode >>= B.hGetContents >>= return . runGet testIn test = openBinaryFile testFile ReadMode >>= B.hGetContents >>= \b -> do let rs = runGet (go True) b return rs

On Saturday 17 July 2010 05:49:47, MAN wrote:
I'm sorry if this matter has already been discussed, but I'm going nuts here. Attached is the code for a small program, an ubber simplification of something I'm trying to do which would enormously gain from lazy serialization. The code, however, is broken... It runs, and does the job, but it does so strictly.
The Get monad has been made strict in binary-0.5. If you need lazy behaviour, you can try binary-0.4.4 (or earlier), or you could write your own lazy Get-wrapper using runGetState (won't be too much fun). HTH, Daniel

That's what I get for not keeping up with the packages. I wish I had asked sooner (I gotta start checking #haskell) Thank you, Daniel. El sáb, 17-07-2010 a las 14:10 +0200, Daniel Fischer escribió:
On Saturday 17 July 2010 05:49:47, MAN wrote:
I'm sorry if this matter has already been discussed, but I'm going nuts here. Attached is the code for a small program, an ubber simplification of something I'm trying to do which would enormously gain from lazy serialization. The code, however, is broken... It runs, and does the job, but it does so strictly.
The Get monad has been made strict in binary-0.5. If you need lazy behaviour, you can try binary-0.4.4 (or earlier), or you could write your own lazy Get-wrapper using runGetState (won't be too much fun).
HTH, Daniel
participants (4)
-
Benjamin Edwards
-
Daniel Fischer
-
Dean Herington & Elizabeth Lacey
-
MAN