I'm not sure it's possible to do that efficiently.

Here's a hypothetical situation: you have a stream with elements tagged as 1 and 2, but none tagged as 0. If somebody applies an operation to the stream of 0 elements (say stdoutLn), we have to process every single element—and perform every single effect—in the input stream before we know that the stream at index 0 is empty. In general, if we apply an operation to an element of one of the output streams, we'd have to process at minimum all the input elements up to and including that particular element. The important thing is that, semantically, the output of your demux operation is not n independent streams, but n views into a single stream.

It's probably possible to implement a version of this function that doesn't process the *entire* input stream up-front—it would just process as much of the input as it needed when you look at any given element in the output—but it probably needs a different type than just Vector to make it work correctly, and I'm not sure how to do that. More importantly, the behavior of this function would still be confusing; it might *look* like you have several distinct streams, but you'd have to do the effects and store the results of every single step in the input stream even if you only used one of your demuxed streams.

Does that explanation make sense? I haven't done much streaming stuff in a while, so I'm struggling a bit with how to express my intuitions about it.

On Wed, Apr 15, 2020 at 9:42 AM ☂Josh Chia (謝任中) <joshchia@gmail.com> wrote:
I have a streaming package question.

Suppose I have a:
Stream (Of (Int, a)) m ()

I would like to demultiplex it into Vector (Stream (Of a) m ()) using the Int as the index of an item into the Vector of output streams.

How can I do this efficiently (constant memory and linear time)?

Does the following work?
import qualified Streaming.Prelude as SP
import qualified Data.Vector as V

type StreamOf a m r = Stream (Of a) m r

demuxStream :: forall a m. MonadIO m
=> Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ()))
demuxStream numSyms stream =
let emptyStreams = V.replicate numSyms (pure ())
processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v
in SP.fold_ processItem emptyStreams id stream

My guess is that it takes more than constant memory as it goes through the entire input stream before returning.

Josh
_______________________________________________
Haskell-Cafe mailing list
To (un)subscribe, modify options or view archives go to:
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
Only members subscribed via the mailman list are allowed to post.