On Wed, Aug 27, 2014 at 9:19 PM, Bryan Vicknair <bryanvick@gmail.com> wrote:
Hello Cafe,

First I'd like to thank Michael Snoyman and Gabriel Gonzalez for the work
they've done on the conduit and pipes stream processing libraries, and all the
accompanying tutorial content.  I've been having fun converting a text
processing app from C to Haskell.

I'm seeing unexpectedly high memory usage in a stream-processing program that
uses the conduit library.

I've created a example that demonstrates the problem.  The program accepts gzip
files as arguments, and for each one, classifies each line as either Even or
Odd depending on the length, then outputs some result depending on the Sink
used.  For each gzip file:

  action :: GzipFilePath -> IO ()
  action (GzipFilePath filePath) = do
    result <- runResourceT $  CB.sourceFile filePath
                           $$ Zlib.ungzip
                           =$ CB.lines
                           =$ token
                           =$ sink2
    putStrLn $ show result

The problem is the following Sink, which counts how many even/odd Tokens are
seen:

  type SinkState = (Integer, Integer)

  sink2 :: (Monad m) => SinkState -> Sink Token m SinkState
  sink2 state@(!evenCount, !oddCount) = do
    maybeToken <- await
    case maybeToken of
      Nothing     -> return state
      (Just Even) -> sink2 (evenCount + 1, oddCount    )
      (Just Odd ) -> sink2 (evenCount    , oddCount + 1)

When I give this program a few gzip files, it uses hundreds of megabytes of
resident memory.  When I give the same files as input, but use the following
simple Sink, it only uses about 8Mb of resident memory:

  sink1 :: MonadIO m => Sink Token m ()
  sink1 = awaitForever (liftIO . putStrLn . show)

At first I thought that sink2 performed so poorly because the addition thunks
were being placed onto the heap until the end, so I added some bang patterns to
make it strict.  That didn't help however.

I've done profiling, but I'm unable to figure out exactly what is being added
to the heap in sink2 but not sink1, or what is being garbage collected in
sink1, but not sink2.


The full source is here:
https://bitbucket.org/bryanvick/conduit-mem/src/HEAD/hsrc/bin/mem.hs

Or you can clone the repo, which contains a cabal file for easy building:

  git clone git@bitbucket.org:bryanvick/conduit-mem.git
  cd comduit-mem
  cabal sandbox init
  cabal install --only-dependencies
  cabal build mem
  ./dist/build/mem/mem [GIVE SOME GZIP FILES HERE]

You can change which sink is used in the 'action' function to see the different
memory usage.

Wow, talk about timing! What you've run into here is expensive monadic bindings. As it turns out, this is exactly what my blog post from last week[1] covered. You have three options to fix this:

1. Just upgrade to conduit 1.2.0, which I released a few hours ago, and uses the codensity transform to avoid the problem. (I just tested your code; you get constant memory usage under conduit 1.2.0, seemingly without any code change necessary.)
2. Instead of writing your `sink2` as you have, express it in terms of Data.Conduit.List.fold, which associates the right way. This looks like:

    fold add (0, 0)
      where
        add (!x, !y) Even = (x + 1, y)
        add (!x, !y) Odd = (x, y + 1)
3. Allow conduit 1.1's rewrite rules to emulate the same behavior and bypass the expensive monadic bind. This can be done by replacing your current `await` with "await followed by bind", e.g.:

sink2 :: (Monad m) => SinkState -> Sink Token m SinkState
sink2 state@(!evenCount, !oddCount) = do
  await >>= maybe (return state) go
 where
  go Even = sink2 (evenCount + 1, oddCount    )
  go Odd  = sink2 (evenCount    , oddCount + 1)

I'd definitely recommend (1). I'd *also* recommend using (2), as the built in functions will often times be more efficient than something hand-rolled, especially now that stream fusion is being added[2]. With conduit 1.2, step (3) *will* be a bit more efficient still (it avoids create an extra Maybe value), but not in a significant way.

Michael

[1] https://www.fpcomplete.com/blog/2014/08/iap-speeding-up-conduit
[2] https://www.fpcomplete.com/blog/2014/08/conduit-stream-fusion