
Finally, I've uploaded a new version of stm-conduit [1] with these
combinators included. You should "cabal update" and then "cabal install
stm-conduit" to get the latest version, and now you can vertically compose
your sources!
Regards,
- clark
[1] http://hackage.haskell.org/package/stm-conduit-0.2.3.0
On Tue, Feb 28, 2012 at 2:58 PM, Clark Gaebel
First of all, I'd probably name that operator >=<, since >=> is Kleisli composition in Control.Monad.
Second, you're going to need new threads for this, since you'll be reading from two sources concurrently. This isn't as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you're using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your >=< function into:
infixl 5 >=< (>=<) :: ResourceIO m => Source m a -> Source m a -> ResourceT m (Source m a) sa >=< sb = do c <- liftIO . atomically $ newTMChan _ <- resourceForkIO $ sa $$ sinkTMChan c _ <- resourceForkIO $ sb $$ sinkTMChan c return $ sourceTMChan c
which returns a new source, combining two sources.
This can further be generalized to combining any number of sources:
mergeSources :: ResourceIO m => [Source m a] -> ResourceT m (Source m a) mergeSources sx = do c <- liftIO . atomically $ newTMChan mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx return $ sourceTMChan c
Hope this helps somewhat, - clark
On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov < alexander.vershilov@gmail.com> wrote:
Hello, cafe.
Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g.
runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources
It would be good if it can be sources of different types (handle or STM channel, etc..).
Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data
source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ |
From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example).
So if there any possible simplifications? Or ideas how to make (>=>) operator.
Example:
So I've got next code in my network-conduit based application:
main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer)
myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout
But in this situation I don't know if freeing of all resources are
guaranteed,
because I'm running additional resourceT in main resourceT scope.
So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT?
-- Best regards, Alexander V Vershilov
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe