
Hello, Simon.
On 31 January 2013 17:24, Simon Marechal
On 31/01/2013 13:50, Michael Snoyman wrote:
* To simplify, we start off with a call to injectLeftovers. This means that we can entirely ignore the Leftover constructor in the main function. * Since a Sink will never yield values, we can also ignore the HaveOutput constructor. * As soon as either of the Sinks terminates, we terminate the other one as well and return the results.
Your gist is extremely informative to me. I figured it would be something along these lines, but was very scared to try it myself. I have however realized that my first use case doesn't cover my need, as I will want to feed an arbitrary set of sinks with any value ...
I started coding right after I sent that mail and wrote this: https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Branching....
It is not very elegant as the "branching" functions outputs [Int].
I haven't tested it yet, but it should branch with any number of sinks. Another point that might (or might not) be of interest is the distribution of distinct branches on separate threads.
You can also consider going the mutable container route if you like. Instead of creating a lot of stuff from scratch with MVars, you could use stm-conduit[2]. In fact, that package already contains some kind of merging behavior for sources, it might make sense to ask the author about including unmerging behavior for Sinks.
I did not think of bounded channels. They are a indeed a better match than MVars !
I can see it uses resourceForkIO, which I believe is OK for sources that will be used in your 'main' thread. But for multiple Sinks, you need a way to wait for the all Sinks to terminate. I used stuff from Control.Concurrent.ParallelIO, but I am not sure it is ideal.
Sorry I've sent my first email of the list, and have no copy to resend it. If you will use stm-conduit you can meet next troubles: 1). early close: if you'll use modern conduit API (yield, await), then try to use $$ on sinkT?Chan very accurate as it will close Channel and so receiver will be also closed. 2). late close: if you'll use resourceForkIO you channel may leave to long, hovewer in much cases you'll be save with forkIO, unless you share resources that was allocated with resourcet. 3). pipeline notifying: if you'll map you branches to different threads you'll need a way to notify that branch is closed (closing channels will be good, but you'll need an additional steps to check if they were closed and either close all pipeline or just forgot this channel). On the other hand you can use one pipeline (and thread) for every branch, then you need approach that was shown in Michael's gist. Run each downstream pipe, and save a result, possibly modifying a list. I'll paste concept that I mailed to you here hovewer in may be not accurate: branch :: [a -> m ()] -- it's better to place sink here branch fs = do chs <- mapM (\f -> (,) f <$> newTBMChanIO) fs bracket (mapM (\(f,ch) -> forkIO $ sourceTBMChan ch $$ f) chs) (mapM_ killThread) (\_ -> do x <- await mapM_ (flip writeTBMChan x . snd) chs) as an additional step you can call isTBMChanClosed on channel to check if branch is still alive and filter channel if so. (Previous version contained a filter function that allowes to send value to branches that needs it, hovewer it's less composable) also you may like iochan-conduit package [1], it can give a better results in some cases hovewer, stm-channels is much more general. [1] https://github.com/qnikst/iochan-conduit -- Alexander Vershilov