Problems with iteratees

I'm working on a project that's using John Lato's old implementation of iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon, but that's a ways off yet) and I'm running into some issues I haven't been able to untangle. Maybe a new set of eyes can help... The overarching program brings three things together for doing some interprocess communication: the medium is Posix FIFOs, the messages themselves are encoded with Google's Protocol Buffers[1], and the control flow for getting and processing the messages is handled by iteratees. The error message indicates iteratees are at fault, though it could be an error elsewhere instead. First, some boilerplate. -- For messageWithLengthEnumeratee only {-# LANGUAGE ScopedTypeVariables #-} import qualified Text.ProtocolBuffers.Reflections as R import qualified Text.ProtocolBuffers.WireMessage as W import qualified Text.ProtocolBuffers.Get as G import qualified Data.ByteString as S import qualified Data.ByteString.Lazy as L import qualified Data.Iteratee as I import Data.Iteratee.WrappedByteString import Data.Word (Word8) import Control.Monad (liftM) -- | Return a final value, and the remainder of the stream. idone :: a -> c el -> I.IterGV c el m a idone a xs = I.Done a (I.Chunk xs) {-# INLINE idone #-} -- | Convert a continuation into 'I.IterGV'. icontinue :: (I.StreamG c el -> m (I.IterGV c el m a)) -> I.IterGV c el m a icontinue k = I.Cont (I.IterateeG k) Nothing {-# INLINE icontinue #-} -- | Throw an error message. ifail :: (Monad m) => String -> I.IterGV c el m a ifail msg = ierror (I.Err msg) {-# INLINE ifail #-} -- | An 'I.IterGV' variant of 'I.throwErr'. ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a ierror err = I.Cont (I.throwErr err) (Just err) {-# INLINE ierror #-} toLazyBS :: S.ByteString -> L.ByteString toLazyBS = L.fromChunks . (:[]) {-# INLINE toLazyBS #-} toStrictBS :: L.ByteString -> S.ByteString toStrictBS = S.concat . L.toChunks {-# INLINE toStrictBS #-} Now we have the code for converting the Get monad used by protocol buffers into an iteratee. This should be correct, and it's pretty straightforward. -- | Convert a 'G.Result' iteratee state into a 'I.IterGV' -- iteratee state. result2iterv :: (Monad m) => G.Result a -> I.IterGV WrappedByteString Word8 m a result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest) result2iterv (G.Failed _ msg) = ifail msg result2iterv (G.Partial k) = I.Cont (iterify k) Nothing -- | Convert a protobuf-style continuation into an -- iteratee-style continuation. iterify :: (Monad m) => (Maybe L.ByteString -> G.Result a) -> I.IterateeG WrappedByteString Word8 m a iterify k = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs) I.EOF Nothing -> result2iterv $ k Nothing I.EOF (Just err) -> ierror err -- | A variant of 'G.runGet' as an iteratee. runGetIteratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.IterateeG WrappedByteString Word8 m a runGetIteratee g = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs) I.EOF Nothing -> result2iterv $ G.runGet g L.empty I.EOF (Just err) -> ierror err Okay, now we have an iteratee which consumes a stream of bytestrings and will render a protocol buffer message. But what we really want is an enumeratee to do this repeatedly so we can use an iteratee to consume the stream of messages. I have the following definition which typechecks, but doesn't seem to work. The call to convStream seems like it always hangs: -- | A variant of 'G.runGet' as an enumeratee. runGetEnumeratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.EnumeratorN WrappedByteString Word8 [] a m b runGetEnumeratee = I.convStream . liftM (Just . (:[])) . runGetIteratee Once we have a working definition of runGetEnumeratee, then we can define the specific enumeratee we need: -- | An enumeratee for converting bytestrings into protocol -- buffer messages. messageWithLengthEnumeratee :: forall m msg a . (Monad m, R.ReflectDescriptor msg, W.Wire msg) => I.EnumeratorN WrappedByteString Word8 [] msg m a messageWithLengthEnumeratee = runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg) And then at the use site we have the following: let processRequest = ... :: msg -> IO () I.run -- run the [()], sending EOF . I.joinIM -- push monadic effects inside . I.enumFdFollow fifo_in -- read from the Fd forever . I.joinI -- when EOF bytestrings, EOF msgs . messageWithLengthEnumeratee -- ByteStrings -> messages $ I.mapM_ processRequest -- process messages I think this part is correct too, but just to be sure... the goal is that we should read bytestrings from the FIFO forever (or until the other process closes their end), and then we read off the messages one by one, handing them off to processRequest to interpret them and respond accordingly. When I put this all together, the process is killed with: control message: Just (Err "endOfInput") Data.Iteratee.Base.run is the origin of the "control message:" part of the error, but I don't know where (Err "endOfInput") is coming from since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I believe runGetEnumeratee is where the problem is, though it could also be the use site or something in one of the libraries. Any help would be appreciated. [1] http://hackage.haskell.org/package/protocol-buffers http://hackage.haskell.org/package/hprotoc -- Live well, ~wren

I think the problem is that the iteratee you give to I.convStream always returns Just [something] while you should return Nothing on EOF. Suppose you want to have an enumeratee that adds 1 to each integer in the stream and then use stream2list to get an iteratee that consumes the result stream and returns it as a list:
let iter = joinI $ (convStream (head >>= return . Just . (:[]) . (+1))) stream2list :: IterateeG [] Int IO [Int] run iter *** Exception: control message: Just (Err "EOF")
Note that run simply passes EOF to iter and extracts the result. Instead of throwing an error the code above should produce an [] (i.e. no stream to consume, no elements in the list). This can be fixed by checking whether the stream is empty:
let iter = joinI $ (convStream (isFinished >>= maybe (head >>= return . Just . (:[]) . (+1)) (\_ -> return Nothing))) stream2list :: IterateeG [] Int IO [Int] run iter []
I think you should do the same in your code:
runGetEnumeratee get =
I.convStream $ isFinished >>= maybe convIter (\_ -> return Nothing)
where
convIter = (Just . return) `liftM` (runGetIteratee get)
When the stream is not empty, it runs (runGetIteratee get) and returns
its result wrapped in Just . (:[]). When the stream is empty, it
returns Nothing so convStream knows it is done.
-- Maciej
On Thu, Feb 3, 2011 at 10:06 AM, wren ng thornton
I'm working on a project that's using John Lato's old implementation of iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon, but that's a ways off yet) and I'm running into some issues I haven't been able to untangle. Maybe a new set of eyes can help...
The overarching program brings three things together for doing some interprocess communication: the medium is Posix FIFOs, the messages themselves are encoded with Google's Protocol Buffers[1], and the control flow for getting and processing the messages is handled by iteratees. The error message indicates iteratees are at fault, though it could be an error elsewhere instead.
First, some boilerplate.
-- For messageWithLengthEnumeratee only {-# LANGUAGE ScopedTypeVariables #-}
import qualified Text.ProtocolBuffers.Reflections as R import qualified Text.ProtocolBuffers.WireMessage as W import qualified Text.ProtocolBuffers.Get as G import qualified Data.ByteString as S import qualified Data.ByteString.Lazy as L import qualified Data.Iteratee as I import Data.Iteratee.WrappedByteString import Data.Word (Word8) import Control.Monad (liftM)
-- | Return a final value, and the remainder of the stream. idone :: a -> c el -> I.IterGV c el m a idone a xs = I.Done a (I.Chunk xs) {-# INLINE idone #-}
-- | Convert a continuation into 'I.IterGV'. icontinue :: (I.StreamG c el -> m (I.IterGV c el m a)) -> I.IterGV c el m a icontinue k = I.Cont (I.IterateeG k) Nothing {-# INLINE icontinue #-}
-- | Throw an error message. ifail :: (Monad m) => String -> I.IterGV c el m a ifail msg = ierror (I.Err msg) {-# INLINE ifail #-}
-- | An 'I.IterGV' variant of 'I.throwErr'. ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a ierror err = I.Cont (I.throwErr err) (Just err) {-# INLINE ierror #-}
toLazyBS :: S.ByteString -> L.ByteString toLazyBS = L.fromChunks . (:[]) {-# INLINE toLazyBS #-}
toStrictBS :: L.ByteString -> S.ByteString toStrictBS = S.concat . L.toChunks {-# INLINE toStrictBS #-}
Now we have the code for converting the Get monad used by protocol buffers into an iteratee. This should be correct, and it's pretty straightforward.
-- | Convert a 'G.Result' iteratee state into a 'I.IterGV' -- iteratee state. result2iterv :: (Monad m) => G.Result a -> I.IterGV WrappedByteString Word8 m a result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest) result2iterv (G.Failed _ msg) = ifail msg result2iterv (G.Partial k) = I.Cont (iterify k) Nothing
-- | Convert a protobuf-style continuation into an -- iteratee-style continuation. iterify :: (Monad m) => (Maybe L.ByteString -> G.Result a) -> I.IterateeG WrappedByteString Word8 m a iterify k = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs) I.EOF Nothing -> result2iterv $ k Nothing I.EOF (Just err) -> ierror err
-- | A variant of 'G.runGet' as an iteratee. runGetIteratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.IterateeG WrappedByteString Word8 m a runGetIteratee g = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs) I.EOF Nothing -> result2iterv $ G.runGet g L.empty I.EOF (Just err) -> ierror err
Okay, now we have an iteratee which consumes a stream of bytestrings and will render a protocol buffer message. But what we really want is an enumeratee to do this repeatedly so we can use an iteratee to consume the stream of messages. I have the following definition which typechecks, but doesn't seem to work. The call to convStream seems like it always hangs:
-- | A variant of 'G.runGet' as an enumeratee. runGetEnumeratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.EnumeratorN WrappedByteString Word8 [] a m b runGetEnumeratee = I.convStream . liftM (Just . (:[])) . runGetIteratee
Once we have a working definition of runGetEnumeratee, then we can define the specific enumeratee we need:
-- | An enumeratee for converting bytestrings into protocol -- buffer messages. messageWithLengthEnumeratee :: forall m msg a . (Monad m, R.ReflectDescriptor msg, W.Wire msg) => I.EnumeratorN WrappedByteString Word8 [] msg m a messageWithLengthEnumeratee = runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)
And then at the use site we have the following:
let processRequest = ... :: msg -> IO () I.run -- run the [()], sending EOF . I.joinIM -- push monadic effects inside . I.enumFdFollow fifo_in -- read from the Fd forever . I.joinI -- when EOF bytestrings, EOF msgs . messageWithLengthEnumeratee -- ByteStrings -> messages $ I.mapM_ processRequest -- process messages
I think this part is correct too, but just to be sure... the goal is that we should read bytestrings from the FIFO forever (or until the other process closes their end), and then we read off the messages one by one, handing them off to processRequest to interpret them and respond accordingly.
When I put this all together, the process is killed with:
control message: Just (Err "endOfInput")
Data.Iteratee.Base.run is the origin of the "control message:" part of the error, but I don't know where (Err "endOfInput") is coming from since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I believe runGetEnumeratee is where the problem is, though it could also be the use site or something in one of the libraries. Any help would be appreciated.
[1] http://hackage.haskell.org/package/protocol-buffers http://hackage.haskell.org/package/hprotoc
-- Live well, ~wren
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On 2/2/11 11:25 PM, Maciej Wos wrote:
I think the problem is that the iteratee you give to I.convStream always returns Just [something] while you should return Nothing on EOF.
That makes sense for the hanging problem (which I only noticed during debugging). Though I still get the the same error message when running the whole program...
On Thu, Feb 3, 2011 at 10:06 AM, wren ng thornton
wrote: When I put this all together, the process is killed with:
control message: Just (Err "endOfInput")
Data.Iteratee.Base.run is the origin of the "control message:" part of the error, but I don't know where (Err "endOfInput") is coming from since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I believe runGetEnumeratee is where the problem is, though it could also be the use site or something in one of the libraries. Any help would be appreciated.
-- Live well, ~wren

-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 On 2/2/11 20:06 , wren ng thornton wrote:
When I put this all together, the process is killed with: control message: Just (Err "endOfInput")
POSIX FIFOs and GHC's nonblocking file descriptors implementation don't play well together; you should launch the writer end first and let it block waiting for the reader, or you should switch to opening the FIFO r/w and add a control message for end-of-stream (the usual way to work with FIFOs). - -- brandon s. allbery [linux,solaris,freebsd,perl] allbery.b@gmail.com system administrator [openafs,heimdal,too many hats] kf8nh -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.11 (Darwin) Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/ iEYEARECAAYFAk1LRWYACgkQIn7hlCsL25UhiwCePaEpZM0wlKRabmOT0SV7UKbP Bc8AnRs+QTl59Cn9JRWUfNE1MBGv0X1S =Fvqe -----END PGP SIGNATURE-----

-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 On 2/3/11 19:16 , Brandon S Allbery KF8NH wrote:
POSIX FIFOs and GHC's nonblocking file descriptors implementation don't play well together; you should launch the writer end first and let it block
More specifically, I think what's happening here is that a non-blocking open() of a FIFO returns with the fd not actually open yet, a situation which isn't expected, and a blocking open will block until the other side is opened. - -- brandon s. allbery [linux,solaris,freebsd,perl] allbery.b@gmail.com system administrator [openafs,heimdal,too many hats] kf8nh -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.11 (Darwin) Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/ iEYEARECAAYFAk1LRf4ACgkQIn7hlCsL25V8dQCgjD+pLVt9LbyqRJ8VYeF8XuLt ieQAoJl/3ws1hh8OJtrjVTyPx9gDRGgW =EcXI -----END PGP SIGNATURE-----

On 2/3/11 7:19 PM, Brandon S Allbery KF8NH wrote:
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1
On 2/3/11 19:16 , Brandon S Allbery KF8NH wrote:
POSIX FIFOs and GHC's nonblocking file descriptors implementation don't play well together; you should launch the writer end first and let it block
More specifically, I think what's happening here is that a non-blocking open() of a FIFO returns with the fd not actually open yet, a situation which isn't expected, and a blocking open will block until the other side is opened.
When opening the Fd[1], the program does block until the other end is opened by another process (verified via printf debugging). But I'll keep that in mind while digging around. I was aware of semi-closed handles, but not semi-open Fds. [1] Via (System.Posix.IO.openFd file System.Posix.IO.ReadOnly Nothing System.Posix.IO.defaultFileFlags). Unfortunately, if compiled under GHC <= 6.12.1 with -threaded, then openFd will always throw an error about the system call being interrupted by GHC's thread scheduling timer. This bug is fixed in unix-2.4.1.0 which, IIRC, requires GHC-7. The problem is that the earlier implementation of openFd uses throwErrnoPathIfMinus1, instead of throwErrnoPathIfMinus1Retry (which does not exist in base <= 4.3.0.0). Cf., http://www.haskell.org/pipermail/glasgow-haskell-users/2009-December/018147.... -- Live well, ~wren

I managed to track down the problem at last. And, as might be expected after staring at it for so long, it was a fairly boneheaded thing. Turns out the error was from an entirely different thread which is using Attoparsec and a modified version of attoparsec-iteratee. I never suspected this thread because I'd previously verified the correctness of the parser (which is indeed correct). The problem is a coworker changed the format of the file that thread was reading, and I'd never bothered to give the parser robust and helpful error messages <chagrin> Thanks all, for helping with the latent bugs in the code I posted and for convincing me that I really wasn't just missing something there. -- Live well, ~wren
participants (3)
-
Brandon S Allbery KF8NH
-
Maciej Wos
-
wren ng thornton