Not directly answering your question, but off the top of my head is: 

  why not demux into a Vector (Sink (Of a) m ()) 

Yet I have no idea if there is actually a Sink construct in any stream lib, but the theory is the Vector should contain reactive actions taking elements, rather than containers to collect elements (even thunks of them demand space).

Just my bit of thoughts, I'm definitely novice at it.

Cheers,
Compl


On 2020-04-16, at 00:40, ☂Josh Chia (謝任中) <joshchia@gmail.com> wrote:

I have a streaming package question.

Suppose I have a:
Stream (Of (Int, a)) m ()

I would like to demultiplex it into Vector (Stream (Of a) m ()) using the Int as the index of an item into the Vector of output streams.

How can I do this efficiently (constant memory and linear time)?

Does the following work?
import qualified Streaming.Prelude as SP
import qualified Data.Vector as V

type StreamOf a m r = Stream (Of a) m r

demuxStream :: forall a m. MonadIO m
=> Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ()))
demuxStream numSyms stream =
let emptyStreams = V.replicate numSyms (pure ())
processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v
in SP.fold_ processItem emptyStreams id stream

My guess is that it takes more than constant memory as it goes through the entire input stream before returning.

Josh
_______________________________________________
Haskell-Cafe mailing list
To (un)subscribe, modify options or view archives go to:
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
Only members subscribed via the mailman list are allowed to post.