
Hello Haskell-Cafe, i've written small program which demonstrates how map/reduce may be implemented in Haskell. it counts amount of words in file, splitting it into 64kb blocks processed by two threads. their results are combined by another two threads. how it may be made better? in particular, is it strict in values send over channels? {-# LANGUAGE BangPatterns #-} import Control.Concurrent import Control.Concurrent.Chan import Control.Monad import Data.IORef import Data.ByteString.Char8 as B hiding (length) import System.Environment import System.IO main = do (file:_) <- getArgs h <- openBinaryFile file ReadMode map <- newChan reduce <- newChan result <- newChan replicateM_ 2 (forkIO$ mapThread map reduce) replicateM_ 2 (forkIO$ reduceThread reduce result) jobs <- new 0 untilM (hIsEOF h) $ do str <- B.hGet h 65536 writeChan map str jobs += 1 jobs' <- val jobs writeChan reduce (0,-jobs') res <- readChan result print res mapThread map reduce = forever $ do str <- readChan map let !sum = length (B.words str) writeChan reduce (sum,1) reduceThread reduce result = forever $ do (sum1,n1) <- readChan reduce (sum2,n2) <- readChan reduce let (!sum,!n) = (sum1+sum2,n1+n2) case n of 0 -> writeChan result sum _ -> writeChan reduce (sum,n) untilM cond action = do deny <- cond unless deny $ do action untilM cond action forever action = action >> forever action infixl 0 =:, += new = newIORef val = readIORef a=:b = writeIORef a b a+=b = modifyIORef a (\a->a+b) -- Best regards, Bulat mailto:Bulat.Ziganshin@gmail.com

i've written small program which demonstrates how map/reduce may be implemented in Haskell. it counts amount of words in file, splitting it into 64kb blocks processed by two threads. their results are combined by another two threads. how it may be made better?
Hello, I do not know anything about
{-# LANGUAGE BangPatterns #-}
But your program logic certainly needs some polishing. Your idea is, to keep track of the number of thunks
jobs <- new 0 untilM (hIsEOF h) $ do str <- B.hGet h 65536 writeChan map str jobs += 1
jobs' <- val jobs writeChan reduce (0,-jobs')
and to collect partial sums, until all jobs are done.
reduceThread reduce result = forever $ do (sum1,n1) <- readChan reduce -- (*) (sum2,n2) <- readChan reduce let (!sum,!n) = (sum1+sum2,n1+n2) case n of 0 -> writeChan result sum _ -> writeChan reduce (sum,n)
I don't like the 'forever' here, especially since it prevents the program from re-usage. The real problem is the possibility of deadlocks: Suppose the situation, where all thunks are done, reducer A is directly before (*) and reducer B directly after (*). Now main sends the message (0,-jobs) and reducer A receives this message. Then both reducers are in state directly after (*) but no more messages are generated: A deadlock. The classical solution is to write an explicit END message into the channel and to keep track of the number of running mappers/reducers and not of the number of jobs. A simple attempt is mappers = 5 reducers = 3 main = do (file:_) <- getArgs h <- openBinaryFile file ReadMode map <- newChan red <- newChan res <- newChan replicateM_ mappers (forkIO $ mapThread map red) replicateM_ reducers (forkIO $ reduceThreadL red res 0) untilM (hIsEOF h) $ B.hGet h 65536 >>= writeChan map . Left writeChan map (Right mappers) -- explicit end-of-input message readChan result >>= print mapThread map red = readChan map >>= \ msg -> case msg of Left str -> writeChan red (Left $ length $ B.words str) >> mapThread map reduce Right 1 -> writeChan red (Right reducers) -- ^ explicit end-of-reduce message Right mappers_running -> writeChan map (Right $ mappers_running-1) reduceThreadL red result sum = readChan red >>= \ msg -> case msg of Left sum1 -> reduceThreadL red result (sum+sum1) -- ^ a new partial sum: reduce locally Right 1 -> writeChan res sum -- ^ I'm the last running reducer: produce the result now Right reducers_running -> writeChan red (Left sum) >> writeChan red (Right $ reducers_running-1) -- ^ reduce phase has ended, I'm not the last reducer: -- send partial reduction result and go home untilM cond action = cond >>= flip unless (action >> untilM cond action) It is quite easy to see that the program stops always, producing the correct result. There is no 'forever', so the program is re-usable. However, it is not exactly the same as the original program, since the local reduction results are stored locally and not send to the channel. If you want to do so, the problem becomes more complex. You have to solve the problem of distributed termination detection. There are algorithms for this, for example some simple token-based that need O(P) steps, where P is the number of processes involved. There are also some O(log P) algorithms, counting activation messages and summing up in two wave fronts these numbers. These algorithms are formulated for message passing environments with the capability to send a message to a specific receiver. I'm quite not sure how to adopt it to the channel-based environment (except via simulating the message passing environment). /BR, Mirko Rahn

Just going back to this, the channel issue may be solved by the strict-concurrency package (strict Chans and MVars), and the general problem of distributing arrays seems to be solved more thoroughly by the data parallel array library (map, fold, scanl, filter, zip et al), not just map and reduce? It takes care of the problem of forking gang threads, distributing work, and does so with a pure interface. -- Don bulat.ziganshin:
Hello Haskell-Cafe,
i've written small program which demonstrates how map/reduce may be implemented in Haskell. it counts amount of words in file, splitting it into 64kb blocks processed by two threads. their results are combined by another two threads. how it may be made better? in particular, is it strict in values send over channels?
{-# LANGUAGE BangPatterns #-}
import Control.Concurrent import Control.Concurrent.Chan import Control.Monad import Data.IORef import Data.ByteString.Char8 as B hiding (length) import System.Environment import System.IO
main = do (file:_) <- getArgs h <- openBinaryFile file ReadMode
map <- newChan reduce <- newChan result <- newChan
replicateM_ 2 (forkIO$ mapThread map reduce) replicateM_ 2 (forkIO$ reduceThread reduce result)
jobs <- new 0 untilM (hIsEOF h) $ do str <- B.hGet h 65536 writeChan map str jobs += 1
jobs' <- val jobs writeChan reduce (0,-jobs')
res <- readChan result print res
mapThread map reduce = forever $ do str <- readChan map let !sum = length (B.words str) writeChan reduce (sum,1)
reduceThread reduce result = forever $ do (sum1,n1) <- readChan reduce (sum2,n2) <- readChan reduce let (!sum,!n) = (sum1+sum2,n1+n2) case n of 0 -> writeChan result sum _ -> writeChan reduce (sum,n)
untilM cond action = do deny <- cond unless deny $ do action untilM cond action
forever action = action >> forever action
infixl 0 =:, += new = newIORef val = readIORef a=:b = writeIORef a b a+=b = modifyIORef a (\a->a+b)
-- Best regards, Bulat mailto:Bulat.Ziganshin@gmail.com
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
participants (3)
-
Bulat Ziganshin
-
Don Stewart
-
Mirko Rahn