Simple way to do something like ArrowChoice.right on a Conduit? (version 1.0.0)

Can I transform a conduit so some values are passed through unchanged, but others go through the conduit? For example: right :: Conduit i m o -> Conduit (Either x i) m (Either x o) This is named after the Control.Arrow combinator of the same name: right :: ArrowChoice a => a b c -> a (Either d b) (Either d c) Here's my use case (simplified): I want to compress data with zlib-conduit, which provides: compress :: Conduit (Flush ByteString) m (Flush ByteString) The Flushhttp://hackage.haskell.org/packages/archive/conduit/latest/doc/html/Data-Con...wrapper lets me flush the compressor so it will yield cached data right away (though hurting compression a little). But before compressing the data, I want to encode it, using this conduit: encode :: Conduit Entry m ByteString I want to combine these, so that if I send a 'Flush', it bypasses 'encode' and feeds to 'compress': compressEncode :: Conduit (Flush Entry) m (Flush ByteString) Thus, I need a variant of 'encode' that passes 'Flush' along: encode' :: Conduit (Flush Entry) m (Flush ByteString) In my actual program, I don't use Flush, so providing a Conduit combinator just for Flush would not help me. Is something like 'right' possible to implement with Conduit's public API? Here's an implementation using Data.Conduit.Internal (untested): import Control.Monad (liftM) import Data.Conduit.Internal (Pipe(..), ConduitM(..), Conduit) right :: Monad m => Conduit i m o -> Conduit (Either x i) m (Either x o) right = ConduitM . rightPipe . unConduitM rightPipe :: Monad m => Pipe i i o () m () -> Pipe (Either x i) (Either x i) (Either x o) () m () rightPipe p0 = case p0 of HaveOutput p c o -> HaveOutput (rightPipe p) c (Right o) NeedInput p c -> NeedInput p' (rightPipe . c) where p' (Left x) = HaveOutput (rightPipe p0) (return ()) (Left x) p' (Right i) = rightPipe $ p i Done r -> Done r PipeM mp -> PipeM $ liftM rightPipe mp Leftover p i -> Leftover (rightPipe p) (Right i) I'm wondering if we could have a Data.Conduit.Arrow module, which provides a newtype variant of Conduit that implements Arrow, ArrowChoice, etc.: import qualified Data.Conduit as C newtype Conduit m i o = Conduit (C.Conduit i m o) -- May need Monad constraints for these instance Category (Conduit m) instance Arrow (Conduit m) instance ArrowChoice (Conduit m) Does 'Conduit' follow Category, Monad, MonadTrans laws* these days? I'm not talking about Pipe in general, just the special case of it represented by the 'Conduit' type alias: Conduit i m o = ConduitM i o m () = Pipe i i o () m () Or are there some thorny issues (e.g. leftovers) that make following these laws impossible in some cases? Thanks for the input, -Joey * Assume functions that use Data.Conduit.Internal do so correctly.

On Thu, Feb 28, 2013 at 9:18 PM, Joey Adams
Can I transform a conduit so some values are passed through unchanged, but others go through the conduit? For example:
right :: Conduit i m o -> Conduit (Either x i) m (Either x o)
Actually, I didn't need this after all. I'm using Automaton from the arrows package for the first part of my pipeline. Only the zlib compression step is a Conduit, so I can just use arrow functions to lift Flush to the rest. Nonetheless, someone else might want to do this. Now that I think of it, not all of the arrow operations make sense (in particular, (***)), but splitting data between conduits (like ArrowChoice (+++)) does make sense, I think.

On Fri, Mar 1, 2013 at 4:18 AM, Joey Adams
Can I transform a conduit so some values are passed through unchanged, but others go through the conduit? For example:
right :: Conduit i m o -> Conduit (Either x i) m (Either x o)
This is named after the Control.Arrow combinator of the same name:
right :: ArrowChoice a => a b c -> a (Either d b) (Either d c)
Here's my use case (simplified): I want to compress data with zlib-conduit, which provides:
compress :: Conduit (Flush ByteString) m (Flush ByteString)
The Flushhttp://hackage.haskell.org/packages/archive/conduit/latest/doc/html/Data-Con...wrapper lets me flush the compressor so it will yield cached data right away (though hurting compression a little).
But before compressing the data, I want to encode it, using this conduit:
encode :: Conduit Entry m ByteString
I want to combine these, so that if I send a 'Flush', it bypasses 'encode' and feeds to 'compress':
compressEncode :: Conduit (Flush Entry) m (Flush ByteString)
Thus, I need a variant of 'encode' that passes 'Flush' along:
encode' :: Conduit (Flush Entry) m (Flush ByteString)
In my actual program, I don't use Flush, so providing a Conduit combinator just for Flush would not help me.
Is something like 'right' possible to implement with Conduit's public API? Here's an implementation using Data.Conduit.Internal (untested):
import Control.Monad (liftM) import Data.Conduit.Internal (Pipe(..), ConduitM(..), Conduit)
right :: Monad m => Conduit i m o -> Conduit (Either x i) m (Either x o) right = ConduitM . rightPipe . unConduitM
rightPipe :: Monad m => Pipe i i o () m () -> Pipe (Either x i) (Either x i) (Either x o) () m () rightPipe p0 = case p0 of HaveOutput p c o -> HaveOutput (rightPipe p) c (Right o) NeedInput p c -> NeedInput p' (rightPipe . c) where p' (Left x) = HaveOutput (rightPipe p0) (return ()) (Left x) p' (Right i) = rightPipe $ p i Done r -> Done r PipeM mp -> PipeM $ liftM rightPipe mp Leftover p i -> Leftover (rightPipe p) (Right i)
I'm fairly certain this cannot be implemented using only the public API. Your implementation looks solid to me.
I'm wondering if we could have a Data.Conduit.Arrow module, which provides a newtype variant of Conduit that implements Arrow, ArrowChoice, etc.:
import qualified Data.Conduit as C
newtype Conduit m i o = Conduit (C.Conduit i m o)
-- May need Monad constraints for these instance Category (Conduit m) instance Arrow (Conduit m) instance ArrowChoice (Conduit m)
As I think you point out in your next email, Conduit can't really be an instance of Arrow. IIRC, there was quite a bit of talk about that when pipes came out, but some of the features of a Pipe (such as allowing input and output to occur at different "speeds") means that it can't be achieved. Nonetheless, I think adding some helping combinators based around Arrows for Conduit makes sense.
Does 'Conduit' follow Category, Monad, MonadTrans laws* these days? I'm not talking about Pipe in general, just the special case of it represented by the 'Conduit' type alias:
Conduit i m o = ConduitM i o m () = Pipe i i o () m ()
Or are there some thorny issues (e.g. leftovers) that make following these laws impossible in some cases?
It's easy to prove that a Conduit with leftovers does not follow the Category laws: id = awaitForever yield (.) = (=$=) id . leftover x /= leftover x That was the motivation for adding the leftover type parameter to the Pipe datatype: if you want to get closer to a Category instance (whatever "closer" would mean here), you need to make sure that the leftover parameter is set to Void. However, even in such a case, there's at least one deviation from strict Category behavior. The order in which finalizers are run does not fully respect the associative laws[1]. In this case, the deviation is intentional: conduit is more concerned with ensuring strict resource usage than associativity. I touched on this point briefly in a recent conduit 1.0 blog post. In my opinion, this is evidence that Category is not the right abstraction to be used for streaming data, since it doesn't give us the ability to guarantee prompt finalization. [1] https://github.com/snoyberg/conduit/pull/57
Thanks for the input, -Joey
* Assume functions that use Data.Conduit.Internal do so correctly.
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Thanks for the response. I spent some time thinking about leftovers and understand the Category issue now. Thanks for clearing that up. While trying to work conduits into a program I'm working on, I find myself wanting something more powerful: a resumable Conduit. For example, consider receiving a stream of messages over a network: data Message = Data ByteString | CompressedData ByteString | RestartCompressor When CompressedData is received, feed the bytes to a decompressor conduit. When RestartCompressor is received, close the first decompressor conduit and fire up a new one. Supporting restarts needs more than just Conduit i m o -> Conduit (Either x i) m (Either x o). It involves opening and closing a conduit within another conduit's operations. Here's a possible API for a resumable Conduit: newtype ResumableConduit i m o = -- hidden -- newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o -- | Feed the 'Source' through the conduit, and send any output from the -- conduit to the 'Sink'. When the 'Sink' returns, close the 'Source', but -- leave the 'ResumableConduit' open so more data can be passed through it. runResumableConduit :: Monad m => ResumableConduit i m o -> Source m i -> Sink o m r -> m (ResumableConduit i m o, r) -- | Tell the conduit there is no more input available, and send the remaining -- output (if any) to the 'Sink'. closeResumableConduit :: Monad m => ResumableConduit i m o -> Sink o m r -> m r Does anyone want to comment on this interface? Perhaps conduit could have a module called "Data.Conduit.Resumable" that contains ResumableSource, ResumableConduit, and ResumableSink. The conduit-resumablesink package by Andrew Miller [1] implements ResumableSink; it just needs to be updated for conduit 1.0. [1]: http://hackage.haskell.org/package/conduit-resumablesink

On Sun, Mar 3, 2013 at 10:24 PM, Joey Adams
... Here's a possible API for a resumable Conduit:
newtype ResumableConduit i m o = -- hidden --
newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o
-- | Feed the 'Source' through the conduit, and send any output from the -- conduit to the 'Sink'. When the 'Sink' returns, close the 'Source', but -- leave the 'ResumableConduit' open so more data can be passed through it. runResumableConduit :: Monad m => ResumableConduit i m o -> Source m i -> Sink o m r -> m (ResumableConduit i m o, r) ...
While trying to implement this, I found a more elegant interface for resuming the ResumableConduit: -- | Fuse a 'ResumableConduit' to a 'Sink'. When the 'Sink' returns, -- it returns the 'ResumableConduit' so the caller can reuse it. (=$++) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r) This takes advantage of Sink's return value to forward the ResumableConduit. I don't think a ($=++) can be implemented. Advantages: * (=$++) is easier to implement than 'runResumableConduit' since it only has to fuse two pipes together instead of three. * Pretty syntax: (resumable', a) <- source $$ resumable =$++ sink

Wow, I hadn't realized that someone had implemented resumable sinks... and
now resumable conduits too! Very interesting.
I'm not sure if I entirely understand your use case, but in general it
should be possible to have multiple Conduits running one after the other.
Here's an example of restarting an accumulator after every multiple of 5:
https://www.fpcomplete.com/user/snoyberg/random-code-snippets/multiple-condu...
Michael
On Mon, Mar 4, 2013 at 6:55 PM, Joey Adams
On Sun, Mar 3, 2013 at 10:24 PM, Joey Adams
wrote: ...
Here's a possible API for a resumable Conduit:
newtype ResumableConduit i m o = -- hidden --
newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o
-- | Feed the 'Source' through the conduit, and send any output from the -- conduit to the 'Sink'. When the 'Sink' returns, close the 'Source', but -- leave the 'ResumableConduit' open so more data can be passed through it. runResumableConduit :: Monad m => ResumableConduit i m o -> Source m i -> Sink o m r -> m (ResumableConduit i m o, r) ...
While trying to implement this, I found a more elegant interface for resuming the ResumableConduit:
-- | Fuse a 'ResumableConduit' to a 'Sink'. When the 'Sink' returns, -- it returns the 'ResumableConduit' so the caller can reuse it. (=$++) :: Monad m
=> ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r)
This takes advantage of Sink's return value to forward the ResumableConduit. I don't think a ($=++) can be implemented.
Advantages:
* (=$++) is easier to implement than 'runResumableConduit' since it only has to fuse two pipes together instead of three.
* Pretty syntax: (resumable', a) <- source $$ resumable =$++ sink

On Tue, Mar 5, 2013 at 9:24 AM, Michael Snoyman
... I'm not sure if I entirely understand your use case, but in general it should be possible to have multiple Conduits running one after the other. Here's an example of restarting an accumulator after every multiple of 5:
https://www.fpcomplete.com/user/snoyberg/random-code-snippets/multiple-condu...
Neat. I didn't think to do that with plain Conduits. I did realize I could use a resumable conduit as a "temporary filter" (basically what your example does). This suggests that a resumable conduit can be used in any consumer (Conduit or Sink), not just a sink. Perhaps it can even be used in a producer, though different operators would be needed (+$= instead of =$+). In my compression example, the incoming message sink needs to feed chunks of compressed data to a zlib conduit. It can't just hand full control of the input to zlib; it has to decode messages, and only send CompressedData messages through zlib. I need a resumable conduit for that. Here's my current implementation of resumable conduits [1]. I don't know much about conduit finalizers; I mostly followed 'connectResume' and 'pipeL'. The main wrinkle is that when the ResumableConduit receives an upstream terminator, it forwards it to the sink, rather than telling the conduit that the stream ended. This allows the conduit to be reused. Only when we finish the ResumableConduit () do we send it the stream terminator. I'll continue toying with this. It might be possible to factor out terminator forwarding, and generalize connectResume to support resumable sources, conduits, and sinks. Thanks for the help, -Joey [1]: https://github.com/joeyadams/hs-resumable-conduit/blob/master/ResumableCondu...

On Wed, Mar 6, 2013 at 5:48 AM, Joey Adams
On Tue, Mar 5, 2013 at 9:24 AM, Michael Snoyman
wrote: ...
I'm not sure if I entirely understand your use case, but in general it should be possible to have multiple Conduits running one after the other. Here's an example of restarting an accumulator after every multiple of 5:
https://www.fpcomplete.com/user/snoyberg/random-code-snippets/multiple-condu...
Neat. I didn't think to do that with plain Conduits. I did realize I could use a resumable conduit as a "temporary filter" (basically what your example does). This suggests that a resumable conduit can be used in any consumer (Conduit or Sink), not just a sink. Perhaps it can even be used in a producer, though different operators would be needed (+$= instead of =$+).
In my compression example, the incoming message sink needs to feed chunks of compressed data to a zlib conduit. It can't just hand full control of the input to zlib; it has to decode messages, and only send CompressedData messages through zlib. I need a resumable conduit for that.
I'm still not sure I follow this. In the example I linked to, the go function within breaker could arbitrarily modify the data before it gets passed on to the inner Conduit. So it seems like it should be possible to achieve your goals this way. But I may just not fully understand your use case. Michael
Here's my current implementation of resumable conduits [1]. I don't know much about conduit finalizers; I mostly followed 'connectResume' and 'pipeL'.
The main wrinkle is that when the ResumableConduit receives an upstream terminator, it forwards it to the sink, rather than telling the conduit that the stream ended. This allows the conduit to be reused. Only when we finish the ResumableConduit () do we send it the stream terminator.
I'll continue toying with this. It might be possible to factor out terminator forwarding, and generalize connectResume to support resumable sources, conduits, and sinks.
Thanks for the help, -Joey
[1]: https://github.com/joeyadams/hs-resumable-conduit/blob/master/ResumableCondu...

On Wed, Mar 6, 2013 at 1:42 AM, Michael Snoyman
I'm still not sure I follow this. In the example I linked to, the go function within breaker could arbitrarily modify the data before it gets passed on to the inner Conduit. So it seems like it should be possible to achieve your goals this way. But I may just not fully understand your use case.
I would have to put my entire message handler in a Sink monad. Also, I'm not sure this approach would work if I wanted to use multiple conduits to process different types of messages, since everything has to go through the zlib conduit. In any case, my existing code is a StateT computation. It'd be convenient if I could just make the ResumableConduit part of my state, rather than turning all that code into a Sink. I pushed a "resumable" branch [1] with a (stub) Data.Conduit.Resumable module. It has ResumableSource, ResumableSink, and ResumableConduit. Data.Conduit re-exports ResumableSource operations. [1]: https://github.com/joeyadams/conduit/tree/resumable
participants (2)
-
Joey Adams
-
Michael Snoyman