Yet another Conduit question

I am working with bulk sources and sinks, that is with a type like: Source m [a] Sink [a] m () The problem is that I would like to work on individual values in my conduit. I can have this: concat :: (Monad m) => Conduit [a] m a concat = awaitForever (mapM_ yield) But how can I do it the other way around ? I suppose it involves pattern matching on the different states my conduit might me in. But is that even possible to do it in a "non blocking" way, that is catenate data while there is something to read (up to a certain threshold), and send it as soon as there is nothing left to read ? Or doesn't that make any sense in the context of Conduits (in the sense that this conduit will be recheck for input before the upstream conduits will have a chance to operate) ? Another approach would be to have a map equivalent: conduitMap :: Conduit i m o -> Conduit [i] m [o] But I am not sure how to do this either ...

Firstly, what's the use case that you want to deal with lists? If it's for
efficiency, you'd probably be better off using a Vector instead.
But I think the inverse of `concat` is `singleton = Data.Conduit.List.map
return`, or `awaitForever $ yield . return`, using the list instance for
Monad. Your conduitMap could be implemented then as:
conduitMap conduit = concat =$= conduit =$= singleton
Michael
On Thu, Jan 31, 2013 at 5:12 PM, Simon Marechal
I am working with bulk sources and sinks, that is with a type like:
Source m [a] Sink [a] m ()
The problem is that I would like to work on individual values in my conduit. I can have this:
concat :: (Monad m) => Conduit [a] m a concat = awaitForever (mapM_ yield)
But how can I do it the other way around ? I suppose it involves pattern matching on the different states my conduit might me in. But is that even possible to do it in a "non blocking" way, that is catenate data while there is something to read (up to a certain threshold), and send it as soon as there is nothing left to read ? Or doesn't that make any sense in the context of Conduits (in the sense that this conduit will be recheck for input before the upstream conduits will have a chance to operate) ?
Another approach would be to have a map equivalent:
conduitMap :: Conduit i m o -> Conduit [i] m [o]
But I am not sure how to do this either ...
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On 02/01/2013 05:21 AM, Michael Snoyman wrote:
Firstly, what's the use case that you want to deal with lists? If it's for efficiency, you'd probably be better off using a Vector instead.
That is a good point, and I wanted to go that way, but was not sure it would help me a lot here. My use case is for services where there is a "bulk" API, such as Redis pipelining or Elasticsearch bulk inserts. The network round-trip gains would exceed by far those from a List to Vector conversion.
But I think the inverse of `concat` is `singleton = Data.Conduit.List.map return`, or `awaitForever $ yield . return`, using the list instance for Monad. Your conduitMap could be implemented then as:
conduitMap conduit = concat =$= conduit =$= singleton
I can see how to do singleton, but that would gain me ... singletons. That means I could not exploit a bulk API.

On Fri, Feb 1, 2013 at 8:42 AM, Simon Marechal
On 02/01/2013 05:21 AM, Michael Snoyman wrote:
Firstly, what's the use case that you want to deal with lists? If it's for efficiency, you'd probably be better off using a Vector instead.
That is a good point, and I wanted to go that way, but was not sure it would help me a lot here. My use case is for services where there is a "bulk" API, such as Redis pipelining or Elasticsearch bulk inserts. The network round-trip gains would exceed by far those from a List to Vector conversion.
But I think the inverse of `concat` is `singleton = Data.Conduit.List.map return`, or `awaitForever $ yield . return`, using the list instance for Monad. Your conduitMap could be implemented then as:
conduitMap conduit = concat =$= conduit =$= singleton
I can see how to do singleton, but that would gain me ... singletons. That means I could not exploit a bulk API.
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here. Michael

On 01/02/2013 08:21, Michael Snoyman wrote:
So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here.
Sorry for not being clear. I would like to group them "as much as possible", that is up to a certain limit, and also within a "time threshold". I believe that the conduit code will be called only when something happens in the conduit, so an actual timer would be useless (unless I handle this at the source perhaps, and propagate "ticks"). That is why in my first message I talked about stacking things into the list until the conduit has no more input available, or a maximum size is reached, but was not sure this even made sense.

I guess you could use the Flush datatype [1] depending on how your
data is generated.
Cheers,
[1] http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Co...
On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal
On 01/02/2013 08:21, Michael Snoyman wrote:
So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here.
Sorry for not being clear. I would like to group them "as much as possible", that is up to a certain limit, and also within a "time threshold". I believe that the conduit code will be called only when something happens in the conduit, so an actual timer would be useless (unless I handle this at the source perhaps, and propagate "ticks").
That is why in my first message I talked about stacking things into the list until the conduit has no more input available, or a maximum size is reached, but was not sure this even made sense.
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Felipe.

I think this is probably the right approach. However, there's something
important to point out: flushing based on timing issues must be handled
*outside* of the conduit functionality, since by design conduit will not
allow you to (for example) run `await` for up to a certain amount of time.
You'll probably need to do this outside of your conduit chain, in the
initial Source. It might look something like this:
yourSource = do
mx <- timeout somePeriod myAction
yield $ maybe Flush Chunk mx
yourSource
On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa wrote: I guess you could use the Flush datatype [1] depending on how your
data is generated. Cheers, [1]
http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Co... On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal On 01/02/2013 08:21, Michael Snoyman wrote: So you're saying you want to keep the same grouping that you had
originally? Or do you want to batch up a certain number of results?
There are lots of ways of approaching this problem, and the types don't
imply nearly enough to determine what you're hoping to achieve here. Sorry for not being clear. I would like to group them "as much as
possible", that is up to a certain limit, and also within a "time
threshold". I believe that the conduit code will be called only when
something happens in the conduit, so an actual timer would be useless
(unless I handle this at the source perhaps, and propagate "ticks"). That is why in my first message I talked about stacking things into the
list until the conduit has no more input available, or a maximum size is
reached, but was not sure this even made sense. _______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe --
Felipe.

While on the subject of conduits and timing, I'm using the following
conduit to add elapsed timing information:
timedConduit :: MonadResource m => forall l o u . Pipe l o o u m (u,
NominalDiffTime)
timedConduit = bracketP getCurrentTime (\_ -> return ()) inner
where inner st = do r <- awaitE
case r of
Right x -> yield x >> inner st
Left r -> deltaTime st >>= \t -> return (r,t)
deltaTime st = liftIO $ flip diffUTCTime st <$> getCurrentTime
I'm aware that this is primarily timing the downstream (and ultimately the
Sink) more than the upstream, and I'm using the bracketP to attempt to
delay the acquisition of the initial time (st) until the first downstream
request for data.
I would appreciate any other insights regarding concerns, issues, or
oddities that I might encounter with the above.
Thanks,
Kevin
On Mon, 04 Feb 2013 02:25:11 -0700, Michael Snoyman
I think this is probably the right approach. However, there's something important to point out: flushing based on timing issues must be handled *outside* of the conduit functionality, since by design conduit will not allow you to (for example) run `await` for up to a certain amount of time. You'll probably need to do this outside of your conduit chain, in the initial Source. It might look something like this:
yourSource = do mx <- timeout somePeriod myAction yield $ maybe Flush Chunk mx yourSource
On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa
wrote:
I guess you could use the Flush datatype [1] depending on how your data is generated.
Cheers,
[1] http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Co...
On 01/02/2013 08:21, Michael Snoyman wrote:
So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here.
Sorry for not being clear. I would like to group them "as much as possible", that is up to a certain limit, and also within a "time threshold". I believe that the conduit code will be called only when something happens in the conduit, so an actual timer would be useless (unless I handle this at the source perhaps, and propagate "ticks").
That is why in my first message I talked about stacking things into
On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal
wrote: the list until the conduit has no more input available, or a maximum size is reached, but was not sure this even made sense.
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Felipe.
-- -KQ

Hmm, that's an interesting trick. I can't say that I ever thought bracketP
would be used in that way. The only change I might recommend is using
addCleanup[1] instead, which doesn't introduce the MonadResource constraint.
Michael
[1]
http://haddocks.fpcomplete.com/fp/7.4.2/2012-12-11/conduit/Data-Conduit-Inte...
On Mon, Feb 4, 2013 at 4:37 PM, Kevin Quick
While on the subject of conduits and timing, I'm using the following conduit to add elapsed timing information:
timedConduit :: MonadResource m => forall l o u . Pipe l o o u m (u, NominalDiffTime) timedConduit = bracketP getCurrentTime (\_ -> return ()) inner where inner st = do r <- awaitE case r of Right x -> yield x >> inner st Left r -> deltaTime st >>= \t -> return (r,t) deltaTime st = liftIO $ flip diffUTCTime st <$> getCurrentTime
I'm aware that this is primarily timing the downstream (and ultimately the Sink) more than the upstream, and I'm using the bracketP to attempt to delay the acquisition of the initial time (st) until the first downstream request for data.
I would appreciate any other insights regarding concerns, issues, or oddities that I might encounter with the above.
Thanks, Kevin
On Mon, 04 Feb 2013 02:25:11 -0700, Michael Snoyman
wrote: I think this is probably the right approach. However, there's something
important to point out: flushing based on timing issues must be handled *outside* of the conduit functionality, since by design conduit will not allow you to (for example) run `await` for up to a certain amount of time. You'll probably need to do this outside of your conduit chain, in the initial Source. It might look something like this:
yourSource = do mx <- timeout somePeriod myAction yield $ maybe Flush Chunk mx yourSource
On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa < felipe.lessa@gmail.com
wrote:
I guess you could use the Flush datatype [1] depending on how your
data is generated.
Cheers,
[1] http://hackage.haskell.org/**packages/archive/conduit/0.5.** 4.1/doc/html/Data-Conduit.**html#t:Flushhttp://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Co...
On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal
wrote: On 01/02/2013 08:21, Michael Snoyman wrote:
So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here.
Sorry for not being clear. I would like to group them "as much as possible", that is up to a certain limit, and also within a "time threshold". I believe that the conduit code will be called only when something happens in the conduit, so an actual timer would be useless (unless I handle this at the source perhaps, and propagate "ticks").
That is why in my first message I talked about stacking things into the list until the conduit has no more input available, or a maximum size is reached, but was not sure this even made sense.
______________________________**_________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/**mailman/listinfo/haskell-cafehttp://www.haskell.org/mailman/listinfo/haskell-cafe
-- Felipe.
-- -KQ
______________________________**_________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/**mailman/listinfo/haskell-cafehttp://www.haskell.org/mailman/listinfo/haskell-cafe

On 03/02/2013 16:06, Felipe Almeida Lessa wrote:
I guess you could use the Flush datatype [1] depending on how your data is generated.
Thank you for this suggestion. I tried to do exactly this by modifying my bulk Redis source so that it can timeout and send empty lists [1]. Then I wrote a few helpers conduits[2], such as : concatFlush :: (Monad m) => Integer -> Conduit [a] m (Flush a) which will convert a stream of [a] into a stream of (Flush a), sending Flush whenever it encounters and empty list or it send a tunable amount of data downstream. I finally modified my examples [3]. I realized then it would be nice to have fmap for conduits (but I am not sure how to write such a type signature). Suggestions are welcome ! [1] https://github.com/bartavelle/hslogstash/commit/663bf8f5e6058b476c9ed9b5c9cf... [2] https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Misc.hs [3] https://github.com/bartavelle/hslogstash/blob/master/examples/RedisToElastic...

On Mon, Feb 4, 2013 at 3:47 PM, Simon Marechal
On 03/02/2013 16:06, Felipe Almeida Lessa wrote:
I guess you could use the Flush datatype [1] depending on how your data is generated.
Thank you for this suggestion. I tried to do exactly this by modifying my bulk Redis source so that it can timeout and send empty lists [1]. Then I wrote a few helpers conduits[2], such as :
concatFlush :: (Monad m) => Integer -> Conduit [a] m (Flush a)
which will convert a stream of [a] into a stream of (Flush a), sending Flush whenever it encounters and empty list or it send a tunable amount of data downstream.
I finally modified my examples [3]. I realized then it would be nice to have fmap for conduits (but I am not sure how to write such a type signature). Suggestions are welcome !
Actually `fmap` already exists on the Pipe datatype, it just probably doesn't do what you want. It modifies the return value, which is only relevant for Sinks. What you probably are looking for is mapOutput[1]. Michael [1] https://haskell.fpcomplete.com/hoogle?q=mapOutput
[1]
https://github.com/bartavelle/hslogstash/commit/663bf8f5e6058b476c9ed9b5c9cf... [2] https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Misc.hs [3]
https://github.com/bartavelle/hslogstash/blob/master/examples/RedisToElastic...
participants (4)
-
Felipe Almeida Lessa
-
Kevin Quick
-
Michael Snoyman
-
Simon Marechal