
I'm working on a project that's using John Lato's old implementation of iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon, but that's a ways off yet) and I'm running into some issues I haven't been able to untangle. Maybe a new set of eyes can help... The overarching program brings three things together for doing some interprocess communication: the medium is Posix FIFOs, the messages themselves are encoded with Google's Protocol Buffers[1], and the control flow for getting and processing the messages is handled by iteratees. The error message indicates iteratees are at fault, though it could be an error elsewhere instead. First, some boilerplate. -- For messageWithLengthEnumeratee only {-# LANGUAGE ScopedTypeVariables #-} import qualified Text.ProtocolBuffers.Reflections as R import qualified Text.ProtocolBuffers.WireMessage as W import qualified Text.ProtocolBuffers.Get as G import qualified Data.ByteString as S import qualified Data.ByteString.Lazy as L import qualified Data.Iteratee as I import Data.Iteratee.WrappedByteString import Data.Word (Word8) import Control.Monad (liftM) -- | Return a final value, and the remainder of the stream. idone :: a -> c el -> I.IterGV c el m a idone a xs = I.Done a (I.Chunk xs) {-# INLINE idone #-} -- | Convert a continuation into 'I.IterGV'. icontinue :: (I.StreamG c el -> m (I.IterGV c el m a)) -> I.IterGV c el m a icontinue k = I.Cont (I.IterateeG k) Nothing {-# INLINE icontinue #-} -- | Throw an error message. ifail :: (Monad m) => String -> I.IterGV c el m a ifail msg = ierror (I.Err msg) {-# INLINE ifail #-} -- | An 'I.IterGV' variant of 'I.throwErr'. ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a ierror err = I.Cont (I.throwErr err) (Just err) {-# INLINE ierror #-} toLazyBS :: S.ByteString -> L.ByteString toLazyBS = L.fromChunks . (:[]) {-# INLINE toLazyBS #-} toStrictBS :: L.ByteString -> S.ByteString toStrictBS = S.concat . L.toChunks {-# INLINE toStrictBS #-} Now we have the code for converting the Get monad used by protocol buffers into an iteratee. This should be correct, and it's pretty straightforward. -- | Convert a 'G.Result' iteratee state into a 'I.IterGV' -- iteratee state. result2iterv :: (Monad m) => G.Result a -> I.IterGV WrappedByteString Word8 m a result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest) result2iterv (G.Failed _ msg) = ifail msg result2iterv (G.Partial k) = I.Cont (iterify k) Nothing -- | Convert a protobuf-style continuation into an -- iteratee-style continuation. iterify :: (Monad m) => (Maybe L.ByteString -> G.Result a) -> I.IterateeG WrappedByteString Word8 m a iterify k = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs) I.EOF Nothing -> result2iterv $ k Nothing I.EOF (Just err) -> ierror err -- | A variant of 'G.runGet' as an iteratee. runGetIteratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.IterateeG WrappedByteString Word8 m a runGetIteratee g = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs) I.EOF Nothing -> result2iterv $ G.runGet g L.empty I.EOF (Just err) -> ierror err Okay, now we have an iteratee which consumes a stream of bytestrings and will render a protocol buffer message. But what we really want is an enumeratee to do this repeatedly so we can use an iteratee to consume the stream of messages. I have the following definition which typechecks, but doesn't seem to work. The call to convStream seems like it always hangs: -- | A variant of 'G.runGet' as an enumeratee. runGetEnumeratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.EnumeratorN WrappedByteString Word8 [] a m b runGetEnumeratee = I.convStream . liftM (Just . (:[])) . runGetIteratee Once we have a working definition of runGetEnumeratee, then we can define the specific enumeratee we need: -- | An enumeratee for converting bytestrings into protocol -- buffer messages. messageWithLengthEnumeratee :: forall m msg a . (Monad m, R.ReflectDescriptor msg, W.Wire msg) => I.EnumeratorN WrappedByteString Word8 [] msg m a messageWithLengthEnumeratee = runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg) And then at the use site we have the following: let processRequest = ... :: msg -> IO () I.run -- run the [()], sending EOF . I.joinIM -- push monadic effects inside . I.enumFdFollow fifo_in -- read from the Fd forever . I.joinI -- when EOF bytestrings, EOF msgs . messageWithLengthEnumeratee -- ByteStrings -> messages $ I.mapM_ processRequest -- process messages I think this part is correct too, but just to be sure... the goal is that we should read bytestrings from the FIFO forever (or until the other process closes their end), and then we read off the messages one by one, handing them off to processRequest to interpret them and respond accordingly. When I put this all together, the process is killed with: control message: Just (Err "endOfInput") Data.Iteratee.Base.run is the origin of the "control message:" part of the error, but I don't know where (Err "endOfInput") is coming from since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I believe runGetEnumeratee is where the problem is, though it could also be the use site or something in one of the libraries. Any help would be appreciated. [1] http://hackage.haskell.org/package/protocol-buffers http://hackage.haskell.org/package/hprotoc -- Live well, ~wren