streaming package: How to demux a stream properly?

I have a streaming package question. Suppose I have a: Stream (Of (Int, a)) m () I would like to demultiplex it into Vector (Stream (Of a) m ()) using the Int as the index of an item into the Vector of output streams. How can I do this efficiently (constant memory and linear time)? Does the following work? import qualified Streaming.Prelude as SP import qualified Data.Vector as V type StreamOf a m r = Stream (Of a) m r demuxStream :: forall a m. MonadIO m => Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ())) demuxStream numSyms stream = let emptyStreams = V.replicate numSyms (pure ()) processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v in SP.fold_ processItem emptyStreams id stream My guess is that it takes more than constant memory as it goes through the entire input stream before returning. Josh

I'm not sure it's possible to do that efficiently.
Here's a hypothetical situation: you have a stream with elements tagged as
1 and 2, but none tagged as 0. If somebody applies an operation to the
stream of 0 elements (say stdoutLn), we have to process every single
element—and perform every single effect—in the input stream before we know
that the stream at index 0 is empty. In general, if we apply an operation
to an element of one of the output streams, we'd have to process at minimum
all the input elements up to and including that particular element. The
important thing is that, semantically, the output of your demux operation
is not n independent streams, but n views into a single stream.
It's probably possible to implement a version of this function that doesn't
process the *entire* input stream up-front—it would just process as much of
the input as it needed when you look at any given element in the output—but
it probably needs a different type than just Vector to make it work
correctly, and I'm not sure how to do that. More importantly, the behavior
of this function would still be confusing; it might *look* like you have
several distinct streams, but you'd have to do the effects and store the
results of every single step in the input stream even if you only used one
of your demuxed streams.
Does that explanation make sense? I haven't done much streaming stuff in a
while, so I'm struggling a bit with how to express my intuitions about it.
On Wed, Apr 15, 2020 at 9:42 AM ☂Josh Chia (謝任中)
I have a streaming package question.
Suppose I have a: Stream (Of (Int, a)) m ()
I would like to demultiplex it into Vector (Stream (Of a) m ()) using the Int as the index of an item into the Vector of output streams.
How can I do this efficiently (constant memory and linear time)?
Does the following work? import qualified Streaming.Prelude as SP import qualified Data.Vector as V
type StreamOf a m r = Stream (Of a) m r
demuxStream :: forall a m. MonadIO m => Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ())) demuxStream numSyms stream = let emptyStreams = V.replicate numSyms (pure ()) processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v in SP.fold_ processItem emptyStreams id stream
My guess is that it takes more than constant memory as it goes through the entire input stream before returning.
Josh _______________________________________________ Haskell-Cafe mailing list To (un)subscribe, modify options or view archives go to: http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe Only members subscribed via the mailman list are allowed to post.

Not directly answering your question, but off the top of my head is: why not demux into a Vector (Sink (Of a) m ()) Yet I have no idea if there is actually a Sink construct in any stream lib, but the theory is the Vector should contain reactive actions taking elements, rather than containers to collect elements (even thunks of them demand space). Just my bit of thoughts, I'm definitely novice at it. Cheers, Compl
On 2020-04-16, at 00:40, ☂Josh Chia (謝任中)
wrote: I have a streaming package question.
Suppose I have a: Stream (Of (Int, a)) m ()
I would like to demultiplex it into Vector (Stream (Of a) m ()) using the Int as the index of an item into the Vector of output streams.
How can I do this efficiently (constant memory and linear time)?
Does the following work? import qualified Streaming.Prelude as SP import qualified Data.Vector as V
type StreamOf a m r = Stream (Of a) m r
demuxStream :: forall a m. MonadIO m => Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ())) demuxStream numSyms stream = let emptyStreams = V.replicate numSyms (pure ()) processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v in SP.fold_ processItem emptyStreams id stream
My guess is that it takes more than constant memory as it goes through the entire input stream before returning.
Josh _______________________________________________ Haskell-Cafe mailing list To (un)subscribe, modify options or view archives go to: http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe Only members subscribed via the mailman list are allowed to post.
participants (3)
-
Tikhon Jelvis
-
YueCompl
-
☂Josh Chia (謝任中)