conduit/pipes/streaming and prompt cleanup

I'm using the 'streaming' library and realized it doesn't close files in a timely way for the way I'm using it, and in fact can't, due to how the library works. I know conduit has put a lot of thought into closing resources in a timely way, so I did an experiment to see what it does, but as far as I can tell, conduit has the same problem. Maybe I'm doing it wrong? The situation is that I'm opening multiple files and mixing their output. I want to close the inputs as soon as I'm done with them. But I can get done with them earlier than the end of the file. Since all of these libraries are based on pulling from downstream, if you don't pull all the way to the end, the close at the end doesn't happen, and has to wait until runResourceT returns, which is too late. I remember long ago reading Oleg's original iteratee paper, and it seems like he called out this problem with pull-based iterators, that the iterator doesn't know when its caller is done with it, so it can't close files on time. Here's a conduit version that I think illustrates the situation: import qualified Conduit as C import Conduit ((.|)) import qualified Control.Monad.Trans as Trans import qualified System.IO as IO main :: IO () main = C.runResourceT $ C.runConduit pipe pipe :: C.ConduitM a c (C.ResourceT IO) () pipe = fileLines "TODO" .| (C.takeC 3 >> C.yield "***") .| C.mapM_C (Trans.liftIO . putStrLn) fileLines :: C.MonadResource m => FilePath -> C.ConduitT i String m () fileLines fname = C.bracketP (IO.openFile fname IO.ReadMode) close handleLines handleLines :: Trans.MonadIO m => IO.Handle -> C.ConduitT i String m () handleLines hdl = loop where loop = do eof <- Trans.liftIO $ IO.hIsEOF hdl if eof then return () else do line <- Trans.liftIO $ IO.hGetLine hdl C.yield line loop close :: IO.Handle -> IO () close hdl = IO.hClose hdl >> putStrLn "=== close" This prints the first three lines of TOOD, then ***, and then "=== close", where the close should go before the ***s. As far as I can see, conduit can't do this any more than 'streaming' can, because 'C.takeC' is just some awaits and yields, with no indication that the final await is more special than any other await. I think what would be necessary to solve this is that combinators like 'take' have to be able to tell the stream to close, and that has to propagate back up to each producer that has registered a cleanup. Of course this renders the stream invalid, so it's not safe to have any other references to the stream around, but I guess streams are stateful in general so that's always true. Maybe I could accumulate the finalizers in the stream data type and have combinators like 'take' call it as soon as they've taken their last. What I actually wound up doing was make a 'takeClose' that also takes a 'close' action to run when its done with the stream. It's not exactly general but I'm not writing a library so I don't need general. Is there some kind of standard or built-in solution for this situation? I know others have given a lot more thought to streaming than I have, so surely this issue has come up. I know there is a lot of talk about "prompt finalisation" and streams vs. pipes vs. conduit, and talk about brackets and whatnot, but despite reading various documents (https://hackage.haskell.org/package/streaming-with, http://www.haskellforall.com/2013/01/pipes-safe-10-resource-management-and.h..., etc.) I still don't really understand what they're talking about. It seems like they're really about reliable cleanup when there are exceptions, not really about prompt cleanup. Certainly pipes-safe doesn't do prompt cleanup, at least not the kind I'm talking about.

I think you're seeing the fact that conduit no longer has finalizers, see:
https://www.snoyman.com/blog/2018/01/drop-conduits-finalizers
Though I don't think your example proves the point. You know that the code
following `takeC 3` will never `await`, but there's no way for the conduit
library to know that. Instead, if you wanted to demonstrate the limitation
of removing finalizers you'd need to rewrite your code to this:
pipe :: C.ConduitM a c (C.ResourceT IO) ()
pipe = ((fileLines "TODO" .| C.takeC 3) >> C.yield "***")
.| C.mapM_C (Trans.liftIO . putStrLn)
Here, we can immediately see the `fileLines` is not going to be `await`ed
from again once `takeC 3` is complete, and yet without finalizers the close
is still delayed. However, now that I've done that slight rewrite, we can
make a further rewrite to leverage the bracket pattern and get back prompt
finalization:
pipe :: C.ConduitM a c (C.ResourceT IO) ()
pipe = ((withFileLines "TODO" (C.takeC 3)) >> C.yield "***")
.| C.mapM_C (Trans.liftIO . putStrLn)
withFileLines
:: C.MonadResource m
=> FilePath
-> C.ConduitT String o m r
-> C.ConduitT i o m r
withFileLines fname inner =
C.bracketP
(IO.openFile fname IO.ReadMode)
close
(\h -> handleLines h .| inner)
On Tue, Jun 4, 2019 at 12:17 AM Evan Laforge
I'm using the 'streaming' library and realized it doesn't close files in a timely way for the way I'm using it, and in fact can't, due to how the library works. I know conduit has put a lot of thought into closing resources in a timely way, so I did an experiment to see what it does, but as far as I can tell, conduit has the same problem. Maybe I'm doing it wrong?
The situation is that I'm opening multiple files and mixing their output. I want to close the inputs as soon as I'm done with them. But I can get done with them earlier than the end of the file. Since all of these libraries are based on pulling from downstream, if you don't pull all the way to the end, the close at the end doesn't happen, and has to wait until runResourceT returns, which is too late. I remember long ago reading Oleg's original iteratee paper, and it seems like he called out this problem with pull-based iterators, that the iterator doesn't know when its caller is done with it, so it can't close files on time.
Here's a conduit version that I think illustrates the situation:
import qualified Conduit as C import Conduit ((.|)) import qualified Control.Monad.Trans as Trans import qualified System.IO as IO
main :: IO () main = C.runResourceT $ C.runConduit pipe
pipe :: C.ConduitM a c (C.ResourceT IO) () pipe = fileLines "TODO" .| (C.takeC 3 >> C.yield "***") .| C.mapM_C (Trans.liftIO . putStrLn)
fileLines :: C.MonadResource m => FilePath -> C.ConduitT i String m () fileLines fname = C.bracketP (IO.openFile fname IO.ReadMode) close handleLines
handleLines :: Trans.MonadIO m => IO.Handle -> C.ConduitT i String m () handleLines hdl = loop where loop = do eof <- Trans.liftIO $ IO.hIsEOF hdl if eof then return () else do line <- Trans.liftIO $ IO.hGetLine hdl C.yield line loop
close :: IO.Handle -> IO () close hdl = IO.hClose hdl >> putStrLn "=== close"
This prints the first three lines of TOOD, then ***, and then "=== close", where the close should go before the ***s.
As far as I can see, conduit can't do this any more than 'streaming' can, because 'C.takeC' is just some awaits and yields, with no indication that the final await is more special than any other await.
I think what would be necessary to solve this is that combinators like 'take' have to be able to tell the stream to close, and that has to propagate back up to each producer that has registered a cleanup. Of course this renders the stream invalid, so it's not safe to have any other references to the stream around, but I guess streams are stateful in general so that's always true. Maybe I could accumulate the finalizers in the stream data type and have combinators like 'take' call it as soon as they've taken their last. What I actually wound up doing was make a 'takeClose' that also takes a 'close' action to run when its done with the stream. It's not exactly general but I'm not writing a library so I don't need general.
Is there some kind of standard or built-in solution for this situation? I know others have given a lot more thought to streaming than I have, so surely this issue has come up.
I know there is a lot of talk about "prompt finalisation" and streams vs. pipes vs. conduit, and talk about brackets and whatnot, but despite reading various documents (https://hackage.haskell.org/package/streaming-with,
http://www.haskellforall.com/2013/01/pipes-safe-10-resource-management-and.h... , etc.) I still don't really understand what they're talking about. It seems like they're really about reliable cleanup when there are exceptions, not really about prompt cleanup. Certainly pipes-safe doesn't do prompt cleanup, at least not the kind I'm talking about. _______________________________________________ Haskell-Cafe mailing list To (un)subscribe, modify options or view archives go to: http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe Only members subscribed via the mailman list are allowed to post.

Hi, thanks for the response!
I think you're seeing the fact that conduit no longer has finalizers, see:
https://www.snoyman.com/blog/2018/01/drop-conduits-finalizers
Though I don't think your example proves the point. You know that the code following `takeC 3` will never `await`, but there's no way for the conduit library to know that. Instead, if you wanted to demonstrate the limitation of removing finalizers you'd need to rewrite your code to this:
The way I was looking at it, the problem is just that: takeC will never await again, but doesn't tell anyone about that, so conduit can't know it. This is why I was thinking of giving it such a way, by notifying upstream explicitly. I can see how that's an unsatisfying solution, because it means that all functions that cut off the input have to say so explicitly, and forgetting to do that in any one of them will lead to a leak. The bracket pattern encodes the scope of the open file into the shape of the expression, and if I can fit my program into that shape then I get automatic finalization, which is nicer. The question then is can I fit it into that shape? If not then it's at least a data point that you were asking for on the blog post. If I can, but only for conduit and not for streaming, then it's a demonstration of a difference of expressiveness between the two, which is interesting. The complicating factor is that I'm streaming multiple overlapping files, which are all demanded by a single sink. So it's not the concatenation that (>>) does. E.g. mainLoop (events :: [(Int, FilePath)]) = loop 0 [] events where loop now running events = do let (toStart, notYet) = span ((<=now) . fst) events starting <- mapM startStream toStart chunks <- mapMaybeM await (running ++ starting) yield (mix chunks) loop (now+n) (running ++ starting) notYet startStream = takeC 3 . fileLines There's some missing stuff to drop streams that ran out, and stop the loop once events and streams run out, but you get the idea. I can't figure out where the (>>) would go that would terminate one of those streams, or even what "terminate" actually means, if it's not something that takeC does explicitly.
participants (2)
-
Evan Laforge
-
Michael Snoyman