question about conduit source

Hello, cafe. Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g. runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources It would be good if it can be sources of different types (handle or STM channel, etc..). Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ | From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example). So if there any possible simplifications? Or ideas how to make (>=>) operator. Example: So I've got next code in my network-conduit based application: main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer) myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout But in this situation I don't know if freeing of all resources are guaranteed, because I'm running additional resourceT in main resourceT scope. So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT? -- Best regards, Alexander V Vershilov

On Tue, Feb 28, 2012 at 6:04 PM, Alexander V Vershilov
Hello, cafe.
Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g.
runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources
It would be good if it can be sources of different types (handle or STM channel, etc..).
Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data
source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ |
From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example).
There's not really any way to do what you're looking to do *without* spawning a separate thread (or using some evented system directly, but I'm assuming that's not the case). If what you're looking to do is block until data is available from source1, and block until data is available from source2, you're going to have to use separate threads and some kind of synchronization. STM Channels seem like a good fit, and normal Chans would probably work as well. Clark Gaebel has already put together stm-conduit[1], maybe he would be interested in adding some additional functions for this use case. [1] http://hackage.haskell.org/packages/archive/stm-conduit/0.2.2.1/doc/html/Dat...
So if there any possible simplifications? Or ideas how to make (>=>) operator.
Example:
So I've got next code in my network-conduit based application:
main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer)
myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout
But in this situation I don't know if freeing of all resources are guaranteed, because I'm running additional resourceT in main resourceT scope.
You can nest ResourceT as much as you want. Each time you call runResourceT, the resources allocated in that block will be freed. I haven't analyzed your code in detail, but it seems fine to me. The only real way you can stop ResourceT from freeing resources is by never triggering the final release, which can be done by either: 1. Having your entire application live inside ResourceT. In such a case, your resources will still be freed, it will just happen at the very end of your application. 2. Use resourceForkIO and let the child threads live indefinitely.
So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT?
-- Best regards, Alexander V Vershilov
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

First of all, I'd probably name that operator >=<, since >=> is Kleisli composition in Control.Monad. Second, you're going to need new threads for this, since you'll be reading from two sources concurrently. This isn't as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you're using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your >=< function into: infixl 5 >=< (>=<) :: ResourceIO m => Source m a -> Source m a -> ResourceT m (Source m a) sa >=< sb = do c <- liftIO . atomically $ newTMChan _ <- resourceForkIO $ sa $$ sinkTMChan c _ <- resourceForkIO $ sb $$ sinkTMChan c return $ sourceTMChan c which returns a new source, combining two sources. This can further be generalized to combining any number of sources: mergeSources :: ResourceIO m => [Source m a] -> ResourceT m (Source m a) mergeSources sx = do c <- liftIO . atomically $ newTMChan mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx return $ sourceTMChan c Hope this helps somewhat, - clark On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov < alexander.vershilov@gmail.com> wrote:
Hello, cafe.
Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g.
runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources
It would be good if it can be sources of different types (handle or STM channel, etc..).
Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data
source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ |
From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example).
So if there any possible simplifications? Or ideas how to make (>=>) operator.
Example:
So I've got next code in my network-conduit based application:
main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer)
myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout
But in this situation I don't know if freeing of all resources are
guaranteed,
because I'm running additional resourceT in main resourceT scope.
So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT?
-- Best regards, Alexander V Vershilov
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Finally, I've uploaded a new version of stm-conduit [1] with these
combinators included. You should "cabal update" and then "cabal install
stm-conduit" to get the latest version, and now you can vertically compose
your sources!
Regards,
- clark
[1] http://hackage.haskell.org/package/stm-conduit-0.2.3.0
On Tue, Feb 28, 2012 at 2:58 PM, Clark Gaebel
First of all, I'd probably name that operator >=<, since >=> is Kleisli composition in Control.Monad.
Second, you're going to need new threads for this, since you'll be reading from two sources concurrently. This isn't as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you're using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your >=< function into:
infixl 5 >=< (>=<) :: ResourceIO m => Source m a -> Source m a -> ResourceT m (Source m a) sa >=< sb = do c <- liftIO . atomically $ newTMChan _ <- resourceForkIO $ sa $$ sinkTMChan c _ <- resourceForkIO $ sb $$ sinkTMChan c return $ sourceTMChan c
which returns a new source, combining two sources.
This can further be generalized to combining any number of sources:
mergeSources :: ResourceIO m => [Source m a] -> ResourceT m (Source m a) mergeSources sx = do c <- liftIO . atomically $ newTMChan mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx return $ sourceTMChan c
Hope this helps somewhat, - clark
On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov < alexander.vershilov@gmail.com> wrote:
Hello, cafe.
Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g.
runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources
It would be good if it can be sources of different types (handle or STM channel, etc..).
Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data
source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ |
From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example).
So if there any possible simplifications? Or ideas how to make (>=>) operator.
Example:
So I've got next code in my network-conduit based application:
main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer)
myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout
But in this situation I don't know if freeing of all resources are
guaranteed,
because I'm running additional resourceT in main resourceT scope.
So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT?
-- Best regards, Alexander V Vershilov
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Hello. Naming operator >=< instead of >=> is a good idea. But this functions are looks very good and will make code easier to understand. Also I'll try using non-STM channel (as Michael adviced) because in such a task I don't need all STM power. Thanks for response. -- Alexander Tue, Feb 28, 2012 at 02:58:46PM -0500, Clark Gaebel wrote
First of all, I'd probably name that operator >=<, since >=> is Kleisli composition in Control.Monad.
Second, you're going to need new threads for this, since you'll be reading from two sources concurrently. This isn't as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you're using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your >=< function into:
infixl 5 >=< (>=<) :: ResourceIO m => Source m a -> Source m a -> ResourceT m (Source m a) sa >=< sb = do c <- liftIO . atomically $ newTMChan _ <- resourceForkIO $ sa $$ sinkTMChan c _ <- resourceForkIO $ sb $$ sinkTMChan c return $ sourceTMChan c
which returns a new source, combining two sources.
This can further be generalized to combining any number of sources:
mergeSources :: ResourceIO m => [Source m a] -> ResourceT m (Source m a) mergeSources sx = do c <- liftIO . atomically $ newTMChan mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx return $ sourceTMChan c
Hope this helps somewhat, - clark
On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov < alexander.vershilov@gmail.com> wrote:
Hello, cafe.
Is it possible to read data from different concurrent sources, i.e. read data from source as soon as it become avaliable, e.g.
runResourceT $ (source1 stdin $= CL.map Left) >=> (source2 handle $= CL.map Right) $= application $$ sink where >=> - stands for concurrent combining of sources
It would be good if it can be sources of different types (handle or STM channel, etc..).
Currently I've found no good way to handle with this situation, except of using STM Channels for collecting data
source1 ---+ | | sink | output sink +---] Channel [-------> application----->] | source source2 ---+ |
From this point of view application takes concurent data, but this implementation requires additional thread per data processing. Also in many cases it will require run additional runResourceT (see later example).
So if there any possible simplifications? Or ideas how to make (>=>) operator.
Example:
So I've got next code in my network-conduit based application:
main :: IO () main = do pool <- createDBPool "..." 10 let r = ServerInit pool forkIO $ forever clientConsole --read channel list and send "Left" flip runReaderT r $ runTCPServer (ServerSettings 3500 Nothing) (protoServer)
myServer src sink = do ch <- liftIO $ atomically $ newTBMChan 16 initState <- lift $ ask _ <- liftIO $ fork . (flip runReaderT initState) $ runResourceT $ src $= C.sequence decode $= CL.map Right $$ sinkTBMChan ch sourceTBMChan ch $= process $= C.sequence encode $$ sinkHandle stdout
But in this situation I don't know if freeing of all resources are
guaranteed,
because I'm running additional resourceT in main resourceT scope.
So can you advice is it possible to make concurrent sources now with currenly implemented library? If it's not possible but worth of implementing, so I can make that functions? Is it correct to runResourceT inside another resourceT?
-- Best regards, Alexander V Vershilov
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
participants (3)
-
Alexander V Vershilov
-
Clark Gaebel
-
Michael Snoyman