see writeTChan and readTChan. I assume readTChan is synchronous :-). writeTChan may be asynchronous for all I can tell (haven't looked deeply).
module Main where
{--
Haskell Concurrent and sometimes parallel prime sieve of eratosthenes
David Leimbach
May 19, 2008
Communicates with typed data channels for sieving using the STM monad (Software Transactional Memory) to
share data between sparks. Sparks are created by forkIO, which can be scheduled to real OS threads.
The algorithm is sloppily contained in sieve :: TChan Int -> TChan Int -> IO ()
which receives a number in the range of [2 .. 10000]. If this is the first number it has received, that
will forever be that spark's number to test for divisibility of subsequent numbers. After this assignment
it writes this value to the reader spark which prints it and waits for the next number to fall out of the
sieve. (which is why we start with 2)
This first sieve running spark will forkIO 1 more sieve spark that will be sent as it's first number, the
first number not evenly divisible by the value of the current sieve spark.
This process continues until the syncVal is received, which terminates the program and shuts down the reader
as well as the main loop.
--}
import Control.Monad.STM
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM.TChan
import System(getArgs)
syncVal :: Int
syncVal = -1
sieve :: TChan Int -> TChan Int -> IO ()
sieve inChan outChan = do
value <- atomically $ readTChan inChan
atomically $ writeTChan outChan value
newchan <- atomically $ newTChan
forkIO $ sieve newchan outChan
forever value newchan
where forever value newchan = do
subsequent <- atomically $ readTChan inChan
if subsequent `mod` value /= 0
then
do
atomically $ writeTChan newchan subsequent
forever value newchan
else if subsequent == syncVal
then
do
atomically $ writeTChan outChan syncVal
return ()
else
forever value newchan
reader :: TChan Int -> TChan Char -> IO ()
reader chan syncChan = do
x <- atomically $ readTChan chan
if x /= syncVal
then
do
-- putStrLn $ show x
reader chan syncChan
else
do
atomically $ writeTChan syncChan 'Q'
return ()
main :: IO ()
main = do
wChannel <- atomically $ newTChan
rChannel <- atomically $ newTChan
sChannel <- atomically $ newTChan
forkIO $ reader rChannel sChannel
forkIO $ sieve wChannel rChannel
x <- getArgs
putStrLn ("Searching for primes up to " ++ (head x))
forM_ [2 .. ((read (head x)) ::Int)] $ \i -> atomically $ writeTChan wChannel i
atomically $ writeTChan wChannel syncVal
atomically $ readTChan sChannel
return ()