Timeouts inside a ConduitParser

I have a ConduitParser https://hackage.haskell.org/package/conduit-parse-0.2.1.0/docs/Data-Conduit-... (a Sink with some parsing state) with a version of satisfy that can time out: satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a satisfy pred = do tId <- liftIO myThreadId timeoutThread <- liftIO $ forkIO $ do threadDelay 1000000 throwTo tId TimeoutException x <- await liftIO $ killThread timeoutThread if pred x then return x else empty However I would rather not deal with the risks involved with handling concurrency myself and use a system library like System.Timeout: satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a satisfy pred = do x <- timeout 1000000 await if pred x then return x else empty This doesn’t work though since I need to be able to both lift and unlift await from IO, and ConduitParser lies on top of ConduitT, which is one of the types of monads that UnliftIO cannot be an instance of https://github.com/fpco/unliftio#limitations. Are there any better approaches to this? https://github.com/fpco/unliftio#limitations https://github.com/fpco/unliftio#limitations https://github.com/fpco/unliftio#limitations

Due to the coroutine-based nature of conduit, this kind of approach isn't
possible. In short, when you're inside the `ConduitT` transformer, you
cannot use `timeout` on a `ConduitT` action because you need to yield
control of execution to a different coroutine. Instead, you should move the
`timeout` call to _outside_ the `ConduitT` parts, e.g.:
timeout foo $ runConduit $ src .| satisfy .| sink
It seems like in this case, that kind of timeout usage is going to be too
coarse-grained. I haven't used it personally, but the `stm-conduit` package
probably has something in the direction you're looking for. Alternatively,
I put together an example of how this might be done using some standard
Haskell libraries like stm and async here:
https://gist.github.com/snoyberg/7e5dd52109b03c8bf1aa8fe1a7e522b9
The basic idea is to have two sibling threads: one running the original
source and writing its values to a queue, and another running the full
conduit pipeline with a modified source that will time out on reads from
that queue.
On Thu, Jun 21, 2018 at 4:08 PM Luke Lau
I have a ConduitParser https://hackage.haskell.org/package/conduit-parse-0.2.1.0/docs/Data-Conduit-... (a Sink with some parsing state) with a version of satisfy that can time out:
satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a satisfy pred = do tId <- liftIO myThreadId timeoutThread <- liftIO $ forkIO $ do threadDelay 1000000 throwTo tId TimeoutException x <- await liftIO $ killThread timeoutThread if pred x then return x else empty
However I would rather not deal with the risks involved with handling concurrency myself and use a system library like System.Timeout:
satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a satisfy pred = do x <- timeout 1000000 await if pred x then return x else empty
This doesn’t work though since I need to be able to both lift and unlift await from IO, and ConduitParser lies on top of ConduitT, which is one of the types of monads that UnliftIO cannot be an instance of https://github.com/fpco/unliftio#limitations. Are there any better approaches to this? https://github.com/fpco/unliftio#limitations https://github.com/fpco/unliftio#limitations https://github.com/fpco/unliftio#limitations _______________________________________________ Haskell-Cafe mailing list To (un)subscribe, modify options or view archives go to: http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe Only members subscribed via the mailman list are allowed to post.
participants (2)
-
Luke Lau
-
Michael Snoyman