Sending messages up-and-down the iteratee-enumerator chain [Was: iterIO-0.1]

David Mazie'res wrote:
What you really want is the ability to send both upstream and downstream control messages. Right now, I'd say iterIO has better support for upstream control messages, while iteratee has better support for downstream messages, since iteratee can just embed an Exception in a Stream. (I'm assuming you could have something like a 'Flush' exception to cause output to be flushed by an Iteratee that was for some reason buffering some.)
Can you explain how iteratee could keep track of the stream position? I'm not saying it's impossible, just that it's a challenging puzzle to make the types come out and I'd love to see the solution.
The code described in this message does exactly that. We illustrate enumerator's telling something to iteratees in the middle of the stream (to Flush their buffers) as well as iteratee's asking an enumerator of something (the stream position). The chunk of a stream and EOF are themselves `control' messages that an enumerator may send; the request for a new chunk, just like the request for a stream position, is just one of the requests an iteratee may ask. The set of messages an enumerator may send and the set of requests an iteratee may ask are both treated as open unions. We illustrate the explicit coding of open unions, to let the type checker ensure that what an iteratee may ask an enumerator can answer, and what an enumerator may tell an iteratee can understand. In process calculus lingo, we implement external (to iteratee) choice, internal choice, and a form of session types. For clarity, we implement a greatly simplified version of iteratees. We assume a single-character chunk, which an iteratee always consumes. Chunking of a stream and look-ahead are orthogonal concerns and have been discussed already. The stream represents the external, producer choice:
data Stream ie = Chunk Char -- the current character | SExc ie -- A message from the enumerator
The chunk is an ever-present option; other choices include EOF and Flush:
data EOF = EOF data Flush = Flush
The iteratee represents the internal, consumer choice:
data Iter ee ie a = Done a | Cont (Stream ie -> Iter ee ie a) | IExc (ee (Iter ee ie) a) -- other requests
Cont is a typical request from an iteratee. It is so typical that we wire it in (we could've treated it as other requests, like Tell). The iteratee is parametrised by what messages it understands and what requests it may ask. Iteratees compose as a monad:
instance Bindable ee => Monad (Iter ee ie) where return = Done Done a >>= f = f a Cont k >>= f = Cont (\x -> k x >>= f) IExc ee >>= f = IExc (comp ee f)
All requests must be bindable, so they can percolate
class Bindable ee where comp :: Monad m => ee m a -> (a -> m b) -> ee m b
An exception is a sort of request (especially if the exception is resumable)
data Err m a = Err (() -> m a)
instance Bindable Err where comp (Err k) f = Err (\x -> k x >>= f)
Another sort of request is to tell the position
data Tell m a = Tell (Int -> m a) instance Bindable Tell where comp (Tell k) f = Tell (\x -> k x >>= f)
We use Either (or higher-kinded E2) to build unions:
class Sum e c where inj :: e -> c prj :: c -> Maybe e
class Sum2 (e :: (* -> *) -> * -> *) (c :: (* -> *) -> * -> *) where inj2 :: e m a -> c m a prj2 :: c m a -> Maybe (e m a)
Iteratees are explicit in what they receive on the stream, the external choices they may handle. But they leave the requests polymorphic to ease composing with other iteratees which may asks more requests. Here is the simplest iteratee, which doesn't do anything but asks for trouble
ierr :: Sum2 Err c => Iter c ie a ierr = IExc . inj2 $ Err (\_ -> ierr)
A typical iteratee, like the head below, asks for little and accepts little:
iehead :: Sum2 Err c => Iter c EOF Char iehead = Cont step where step (Chunk a) = Done a step (SExc EOF) = ierr
We can ask for the current position:
itell :: Sum2 Tell c => Iter c ie Int itell = IExc . inj2 $ Tell Done
Enumerators, in contrast, are explicit in what requests they may satisfy, but implicit in what they may send on the stream. A typical, small enumerator requires that an iteratee understand at least EOF, and answers no requests beyond errors.
en_str :: Sum EOF ie => String -> Iter Err ie x -> Iter Err ie x en_str _ i@Done{} = i en_str _ (IExc x) | Just (Err _) <- prj2 x = ierr en_str "" (Cont k) = k eof en_str (h:t) (Cont k) = en_str t $ k (Chunk h)
A typical enumeratee, like the following keeper of positions, is explicit in requests it accepts: only Tell and Err. The Tell requests are satisfied and not propagated. The stream messages are relayed from the outer to the inner stream:
en_pos :: Int -> Iter (E2 Err Tell) ie x -> Iter Err ie x en_pos _ (Done x) = return x en_pos n (Cont k) = Cont (\s -> en_pos (n+1) (k s)) en_pos _ (IExc x) | Just (Err _) <- prj2 x = ierr en_pos n (IExc x) | Just (Tell k) <- prj2 x = en_pos n (k n)
Here are some of the examples:
t1 = irun $ en_str "x" iehead
-- Type error! en_str doesn't know how to handle Tell -- tb2 = irun $ en_str "x" itell
We interpose en_pos enumeratee to deal with positional requests, so we can run the whole example:
t5 = irun $ en_str "xab" $ en_pos 0 $ iter where iter = do x <- iehead y <- ietell return (x,y)
The complete code is available at http://okmij.org/ftp/Haskell/Iteratee/UpDown.hs

At Fri, 13 May 2011 02:57:38 -0700 (PDT), oleg@okmij.org wrote:
The code described in this message does exactly that.
Hey, Oleg. This is really cool! In particular, your Bindable class has the potential to unify a whole bunch of request types and both simplify and generalize code. Also, Sum is clearly a more elegant solution that just requiring everything to be Typeable. It may solve some problems I had where I wanted to send messages in exceptions that contained types I didn't know to be Typeable. I need to digest the code a bit more, but it may make sense for me to use this technique in a future version of iterIO. (Much of iterIO is obviously inspired by your stuff as it is.) However, I still have two questions. First, the Iter type in your message seems more like your first iteratee implementation, which is the approach iterIO and enumerator now take. I wonder if it's possible to implement something like Tell your current, CPS-based iteratee. Part of the reason I didn't take a CPS-based approach for Iter was that I couldn't get the upward control requests to work. (Also I wanted pure iteratees, which reduced the gain from CPS.) A challenge for Tell is that you need to know the size of buffered data and not move the input stream. So the control handler needs to decide what happens to residual data (since Seek does flush the input). Conceptually, it seems like it ought to be doable to pass residual data up and down the enumerator/iteratee stack in a CPS style. But when I try to represent residual input as something like: data Input r m s = forall a. Input ((Stream s -> Iteratee s m a) -> m r) I just can't get the types to work out. The second question is what happens to residual data for downstream requests. In the prototype code of your message, the Stream is over Chars, which are not a Monoid. In practice, you obviously want iteratees to be able to look arbitrarily far ahead--for instance an iteratee that returns a number of digits that is a multiple of 8 might have 8 characters of residual data (if the first 7 are digits). So what I'm stuck on is figuring out the right way to sequence the downstream requests with respect to the input data, particularly when you have enumeratees transcoding from one type to the other. Any thoughts? Thanks, David

Sorry, this is just a simple answer to one question:
However, I still have two questions. First, the Iter type in your message seems more like your first iteratee implementation, which is the approach iterIO and enumerator now take. I wonder if it's possible to implement something like Tell your current, CPS-based iteratee. Part of the reason I didn't take a CPS-based approach for Iter was that I couldn't get the upward control requests to work. (Also I wanted pure iteratees, which reduced the gain from CPS.)
Here are the differences from the file UpDown.hs (along the lines of IterateeMCPS.hs) newtype IterCPS ee ie a = -- non-monadic answer-type IterCPS{runIter :: forall r. (a -> r) -> ((Stream ie -> IterCPS ee ie a) -> r) -> (ee (IterCPS ee ie) a -> r) -> r} instance Bindable ee => Monad (IterCPS ee ie) where return x = IterCPS $ \ kd _ _ -> kd x IterCPS m >>= f = IterCPS $ \kd kc ke -> let m_done x = runIter (f x) kd kc ke m_cont g = kc (\s -> g s >>= f) m_iexc e = ke (comp e f) in m m_done m_cont m_iexc -- The simplest iteratee, which doesn't do anything but asks for trouble ierr :: Sum2 Err c => IterCPS c ie a ierr = IterCPS $ \_ _ ke -> ke . inj2 $ Err (\_ -> ierr) -- A small iteratee: asks for little and accepts little -- Return the current element iehead :: (Sum2 Err c, Bindable c) => IterCPS c EOF Char iehead = IterCPS $ \_ kc _ -> kc step where step (Chunk a) = return a step (SExc EOF) = ierr -- Ask for the current position itell :: (Sum2 Tell c, Bindable c) => IterCPS c ie Int itell = IterCPS $ \_ _ ke -> ke . inj2 $ Tell return -- check to see if the current character is 'a' and it occurs at pos 2 (1-based) ietell :: (Sum2 Err c, Sum2 Tell c, Bindable c) => IterCPS c EOF Bool ietell = IterCPS $ \_ kc _ -> kc step where step (Chunk 'a') = itell >>= return . (== 2) step (Chunk _) = return False step (SExc EOF) = ierr -- Like iehead, but accept the Flush message ieflush :: (Sum2 Err c, Bindable c) => IterCPS c (Either EOF Flush) Char ieflush = IterCPS $ \_ kc _ -> kc step where step (Chunk a) = return a step (SExc x) | Just EOF <- prj x = ierr step (SExc x) | Just Flush <- prj x = ieflush -- Enumerators and enumeratees -- Enumerators, in contrast, are explicit in what requests they may -- satisfy, but implicit in what they may send on the stream. -- Simple typical enumerator -- The iteratee must at least accept EOF -- The iteratee may return Err, but no other requests en_str :: Sum EOF ie => String -> IterCPS Err ie x -> IterCPS Err ie x en_str str i = runIter i kd kc ke where kd = return kc k = case str of "" -> k eof (h:t) -> en_str t $ k (Chunk h) ke x | Just (Err _) <- prj2 x = ierr -- A typical enumeratee -- It keeps the track of positions -- It is explicit in requests it accepts: only Tell and Err. -- It is polymorphic in the in-stream messages en_pos :: Int -> IterCPS (E2 Err Tell) ie x -> IterCPS Err ie x en_pos n i = runIter i kd kc ke where kd = return kc k = IterCPS $ \_ kc _ -> kc (\s -> en_pos (n+1) (k s)) ke x | Just (Err _) <- prj2 x = ierr ke x | Just (Tell k) <- prj2 x = en_pos n (k n) irun :: Sum EOF ie => IterCPS Err ie x -> x irun i = runIter i kd kc ke where kd x = x kc k = irun $ k eof ke _ = error "Iter error" The rest of the code, including the tests, are the same.
participants (2)
-
dm-list-haskell-cafe@scs.stanford.edu
-
oleg@okmij.org