
I think the problem is that the iteratee you give to I.convStream always returns Just [something] while you should return Nothing on EOF. Suppose you want to have an enumeratee that adds 1 to each integer in the stream and then use stream2list to get an iteratee that consumes the result stream and returns it as a list:
let iter = joinI $ (convStream (head >>= return . Just . (:[]) . (+1))) stream2list :: IterateeG [] Int IO [Int] run iter *** Exception: control message: Just (Err "EOF")
Note that run simply passes EOF to iter and extracts the result. Instead of throwing an error the code above should produce an [] (i.e. no stream to consume, no elements in the list). This can be fixed by checking whether the stream is empty:
let iter = joinI $ (convStream (isFinished >>= maybe (head >>= return . Just . (:[]) . (+1)) (\_ -> return Nothing))) stream2list :: IterateeG [] Int IO [Int] run iter []
I think you should do the same in your code:
runGetEnumeratee get =
I.convStream $ isFinished >>= maybe convIter (\_ -> return Nothing)
where
convIter = (Just . return) `liftM` (runGetIteratee get)
When the stream is not empty, it runs (runGetIteratee get) and returns
its result wrapped in Just . (:[]). When the stream is empty, it
returns Nothing so convStream knows it is done.
-- Maciej
On Thu, Feb 3, 2011 at 10:06 AM, wren ng thornton
I'm working on a project that's using John Lato's old implementation of iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon, but that's a ways off yet) and I'm running into some issues I haven't been able to untangle. Maybe a new set of eyes can help...
The overarching program brings three things together for doing some interprocess communication: the medium is Posix FIFOs, the messages themselves are encoded with Google's Protocol Buffers[1], and the control flow for getting and processing the messages is handled by iteratees. The error message indicates iteratees are at fault, though it could be an error elsewhere instead.
First, some boilerplate.
-- For messageWithLengthEnumeratee only {-# LANGUAGE ScopedTypeVariables #-}
import qualified Text.ProtocolBuffers.Reflections as R import qualified Text.ProtocolBuffers.WireMessage as W import qualified Text.ProtocolBuffers.Get as G import qualified Data.ByteString as S import qualified Data.ByteString.Lazy as L import qualified Data.Iteratee as I import Data.Iteratee.WrappedByteString import Data.Word (Word8) import Control.Monad (liftM)
-- | Return a final value, and the remainder of the stream. idone :: a -> c el -> I.IterGV c el m a idone a xs = I.Done a (I.Chunk xs) {-# INLINE idone #-}
-- | Convert a continuation into 'I.IterGV'. icontinue :: (I.StreamG c el -> m (I.IterGV c el m a)) -> I.IterGV c el m a icontinue k = I.Cont (I.IterateeG k) Nothing {-# INLINE icontinue #-}
-- | Throw an error message. ifail :: (Monad m) => String -> I.IterGV c el m a ifail msg = ierror (I.Err msg) {-# INLINE ifail #-}
-- | An 'I.IterGV' variant of 'I.throwErr'. ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a ierror err = I.Cont (I.throwErr err) (Just err) {-# INLINE ierror #-}
toLazyBS :: S.ByteString -> L.ByteString toLazyBS = L.fromChunks . (:[]) {-# INLINE toLazyBS #-}
toStrictBS :: L.ByteString -> S.ByteString toStrictBS = S.concat . L.toChunks {-# INLINE toStrictBS #-}
Now we have the code for converting the Get monad used by protocol buffers into an iteratee. This should be correct, and it's pretty straightforward.
-- | Convert a 'G.Result' iteratee state into a 'I.IterGV' -- iteratee state. result2iterv :: (Monad m) => G.Result a -> I.IterGV WrappedByteString Word8 m a result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest) result2iterv (G.Failed _ msg) = ifail msg result2iterv (G.Partial k) = I.Cont (iterify k) Nothing
-- | Convert a protobuf-style continuation into an -- iteratee-style continuation. iterify :: (Monad m) => (Maybe L.ByteString -> G.Result a) -> I.IterateeG WrappedByteString Word8 m a iterify k = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs) I.EOF Nothing -> result2iterv $ k Nothing I.EOF (Just err) -> ierror err
-- | A variant of 'G.runGet' as an iteratee. runGetIteratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.IterateeG WrappedByteString Word8 m a runGetIteratee g = I.IterateeG $ \s -> return $! case s of I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs) I.EOF Nothing -> result2iterv $ G.runGet g L.empty I.EOF (Just err) -> ierror err
Okay, now we have an iteratee which consumes a stream of bytestrings and will render a protocol buffer message. But what we really want is an enumeratee to do this repeatedly so we can use an iteratee to consume the stream of messages. I have the following definition which typechecks, but doesn't seem to work. The call to convStream seems like it always hangs:
-- | A variant of 'G.runGet' as an enumeratee. runGetEnumeratee :: (Monad m, R.ReflectDescriptor a, W.Wire a) => G.Get a -> I.EnumeratorN WrappedByteString Word8 [] a m b runGetEnumeratee = I.convStream . liftM (Just . (:[])) . runGetIteratee
Once we have a working definition of runGetEnumeratee, then we can define the specific enumeratee we need:
-- | An enumeratee for converting bytestrings into protocol -- buffer messages. messageWithLengthEnumeratee :: forall m msg a . (Monad m, R.ReflectDescriptor msg, W.Wire msg) => I.EnumeratorN WrappedByteString Word8 [] msg m a messageWithLengthEnumeratee = runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)
And then at the use site we have the following:
let processRequest = ... :: msg -> IO () I.run -- run the [()], sending EOF . I.joinIM -- push monadic effects inside . I.enumFdFollow fifo_in -- read from the Fd forever . I.joinI -- when EOF bytestrings, EOF msgs . messageWithLengthEnumeratee -- ByteStrings -> messages $ I.mapM_ processRequest -- process messages
I think this part is correct too, but just to be sure... the goal is that we should read bytestrings from the FIFO forever (or until the other process closes their end), and then we read off the messages one by one, handing them off to processRequest to interpret them and respond accordingly.
When I put this all together, the process is killed with:
control message: Just (Err "endOfInput")
Data.Iteratee.Base.run is the origin of the "control message:" part of the error, but I don't know where (Err "endOfInput") is coming from since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I believe runGetEnumeratee is where the problem is, though it could also be the use site or something in one of the libraries. Any help would be appreciated.
[1] http://hackage.haskell.org/package/protocol-buffers http://hackage.haskell.org/package/hprotoc
-- Live well, ~wren
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe