
{- I've always thought that the essence of iteratees is just CPS (I have no desire to argue the merits of this statement, and I preemptively concede to anyone who disagrees with this), so I decided to experiment with a direct CPS version of streams. The resulting code turned out nicer than I'd expected, so I'm writing about it in case anyone else finds it interesting or instructive (it seems to me that Haskellers tend to shy away from CPS). I'm pretty sure this technique is not new, but a cursory search didn't turn up quite the same thing. I'd be interested in any pointers to this somewhere else, as well any general feedback about this. -} {-# LANGUAGE ScopedTypeVariables, TupleSections #-} import Control.Applicative hiding (empty) import Control.Arrow import Control.Exception import Control.Monad (unless) import Data.Attoparsec.Text import Data.Text import Data.Text.IO import Prelude hiding (catch, getLine, lines, length, null, putStrLn, readFile, splitAt) import qualified Prelude as P import System.IO (IOMode(..), hClose, hIsClosed, withFile) import System.IO.Error hiding (catch) {- A stream is a function which takes a stream continuation and gives a result. A stream continuation takes an input (the next element of the stream) and the rest of the stream and returns a result. If the continuation is given Nothing, then the stream it was given to is done. This intuition can be realized with the following type which will let us do compositional incremental input processing in a direct functional style. -} newtype Stream0CPS a r = Stream0CPS { stream0CPS :: Maybe (a, Stream0 a r) -> r } type Stream0 a r = Stream0CPS a r -> r -- Stream the lines of a file. -- The finally statement is not necessary, but it lets us observe when files are closed. fileStream0 :: FilePath -> Stream0 Text (IO r) fileStream0 fileName k = withFile fileName ReadMode $ \h -> do P.putStrLn $ "___opening "++fileName finally (go h k) (do isClosed <- hIsClosed h unless isClosed $ hClose h >> P.putStrLn ("___finally closing "++fileName) ) where go h k = stream0CPS k . fmap (, go h) =<< safeNextLine h safeNextLine h = catch (Just <$> hGetLine h) $ \e -> if isEOFError e then hClose h >> P.putStrLn ("___EOF closing "++fileName) >> return Nothing else ioError e -- Print elements of a stream. printStream0 :: Show a => Stream0 a (IO ()) -> IO () printStream0 xs = xs $ Stream0CPS $ maybe (P.putStrLn "NOTHING") (\(x, xs') -> P.putStrLn (show x) >> printStream0 xs') -- Append two streams together. appendStream0 :: Stream0 a r -> Stream0 a r -> Stream0 a r appendStream0 xs ys k = xs $ Stream0CPS cont where cont Nothing = ys k cont (Just (v, xs')) = stream0CPS k $ Just (v, appendStream0 xs' ys) -- Create a stream with only the first n elements of a given stream. takeStream0 :: Int -> Stream0 a r -> Stream0 a r takeStream0 n xs k | n > 0 = xs $ Stream0CPS $ stream0CPS k . fmap (id *** takeStream0 (n - 1)) | otherwise = stream0CPS k Nothing {- Suppose we have a file named testWords with the words "one" through "five" on separate lines, and a file called testNums with the numbers 1 though 5 on separate lines. Then we can do the following: *Main> printStream0 $ appendStream0 (fileStream0 "testWords") (fileStream0 "testNums") ___opening testWords "one" "two" "three" "four" "five" ___EOF closing testWords ___opening testNums "1" "2" "3" "4" "5" ___EOF closing testNums NOTHING which is as expected, however, we also see: *Main> printStream0 $ appendStream0 (takeStream0 3 $ fileStream0 "testWords") (fileStream0 "testNums") ___opening testWords "one" "two" "three" ___opening testNums "1" "2" "3" "4" "5" ___EOF closing testNums NOTHING ___finally closing testWords which is not ideal since we'd like testWords to be closed as soon as we're done with it. In order to accomplish this, we have to add a way to tell a stream to close down. This is easily accomplished by giving an extra argument to streams, which tells them what to do. While below we have only allowed for streams to continue and stop, there is no reason the StreamAction type, and the stream functions, couldn't be expanded to allow for dynamically changing stream behavior, e.g. a variable sized chunker, or a tunable filter. -} newtype StreamCPS a r = StreamCPS { streamCPS :: Maybe (a, Stream a r) -> r } type Stream a r = StreamAction -> StreamCPS a r -> r data StreamAction = Continue | Stop {- Now we just have to modify the previous functions to explain what to do when asked to stop. -} fileStream :: FilePath -> Stream Text (IO r) fileStream fileName action k = withFile fileName ReadMode $ \h -> do P.putStrLn $ "___opening "++fileName finally (go h action k) (do isClosed <- hIsClosed h unless isClosed $ hClose h >> P.putStrLn ("___finally closing "++fileName) ) where go h Continue k = streamCPS k . fmap (, go h) =<< safeNextLine h go h Stop k = hClose h >> P.putStrLn ("___Stop closing "++fileName) >> streamCPS k Nothing safeNextLine h = catch (Just <$> hGetLine h) $ \e -> if isEOFError e then hClose h >> P.putStrLn ("___EOF closing "++fileName) >> return Nothing else ioError e printStream :: Show a => Stream a (IO ()) -> IO () printStream xs = xs Continue $ StreamCPS $ maybe (P.putStrLn "NOTHING") (\(x, xs') -> P.putStrLn (show x) >> printStream xs') -- In order to stop appendStream xs ys, stop xs then stop ys appendStream :: Stream a r -> Stream a r -> Stream a r appendStream xs ys Stop k = xs Stop $ StreamCPS $ \_ -> ys Stop k appendStream xs ys Continue k = xs Continue $ StreamCPS cont where cont Nothing = ys Continue k cont (Just (v, xs')) = streamCPS k $ Just (v, appendStream xs' ys) takeStream :: Int -> Stream a r -> Stream a r takeStream _ xs Stop k = xs Stop k takeStream n xs Continue k | n > 0 = xs Continue $ StreamCPS $ streamCPS k . fmap (id *** takeStream (n - 1)) | otherwise = xs Stop k {- Now we get the desired behavior: *Main> printStream $ appendStream (takeStream 3 $ fileStream "testWords") (fileStream "testNums") ___opening testWords "one" "two" "three" ___Stop closing testWords ___opening testNums "1" "2" "3" "4" "5" ___EOF closing testNums NOTHING Here are a few more stream functions to give a feel for what sorts of things can easily be done. -} emptyStream :: Stream a r emptyStream _ k = streamCPS k Nothing getLineStream :: Stream Text (IO r) getLineStream Stop k = streamCPS k Nothing getLineStream Continue k = streamCPS k . Just . (, getLineStream) =<< getLine listToStream :: [a] -> Stream a r listToStream _ Stop k = streamCPS k Nothing listToStream [] Continue k = streamCPS k Nothing listToStream (x:xs) Continue k = streamCPS k $ Just (x, listToStream xs) -- Stream consumers should have a general enough return type to work with streams in a monad. countStream :: Monad m => Stream a (m Int) -> m Int countStream xs = xs Continue $ StreamCPS $ go 0 where go n (Just (_, xs')) = xs' Continue $ StreamCPS $ go $ n + 1 go n Nothing = return n consStream :: a -> Stream a r -> Stream a r consStream _ s Stop k = s Stop k consStream x s Continue k = streamCPS k $ Just (x, s) mapStream :: (a -> b) -> Stream a r -> Stream b r mapStream f xs action k = xs action $ StreamCPS $ streamCPS k . fmap (f *** mapStream f) filterStream :: (a -> Bool) -> Stream a r -> Stream a r filterStream p xs action k = xs action $ StreamCPS $ maybe (streamCPS k Nothing) (\(x, xs') -> if p x then streamCPS k $ Just (x, filterStream p xs') else filterStream p xs' Continue k) dropStream :: Int -> Stream a r -> Stream a r dropStream _ xs Stop k = xs Stop k dropStream n xs Continue k = xs Continue newK where newK | n > 0 = StreamCPS $ maybe (streamCPS k Nothing) (\(_, xs') -> dropStream (n - 1) xs' Continue k) | otherwise = k -- Round robin interleaving of a list of streams. mergeStreams :: [Stream a r] -> Stream a r mergeStreams [] _ k = streamCPS k Nothing mergeStreams (s:ss) Stop k = s Stop $ StreamCPS $ \_ -> mergeStreams ss Stop k mergeStreams (s:ss) Continue k = s Continue $ StreamCPS $ maybe (mergeStreams ss Continue k) $ \(v,s') -> streamCPS k $ Just (v, mergeStreams $ ss++[s']) -- Run an attoparsec parser over the given stream. -- Note the parser can consume partial and/or multiple lines from the stream to generate its result. attoStream :: forall a r . Parser a -> Stream Text (IO r) -> Stream a (IO r) attoStream p = go (parse p) where go :: (Text -> Result a) -> Stream Text (IO r) -> Stream a (IO r) go _ xs Stop k = xs Stop $ StreamCPS $ \_ -> streamCPS k Nothing go f xs Continue k = xs Continue $ StreamCPS $ maybe (streamCPS k Nothing) $ \(line, xs') -> case f line of Done rest v -> streamCPS k $ Just (v, attoStream p $ consStream rest xs') f'@(Partial _) -> go (feed f') xs' Continue k f'@(Fail _ _ctxs err) -> P.putStrLn ("___Atto Error: "++err) >> go (feed f') xs' Stop k

jeff p
I've always thought that the essence of iteratees is just CPS
for sure, at some level of abstraction this ought to be true, since CPS simulates call-by-value in a call-by-name language, cf. Gordon Plotkin: Call-by-Name, Call-by Value and the Lambda Calculus TCS , Vol. 1, pp. 125-159, http://homepages.inf.ed.ac.uk/gdp/publications/ and the purpose of iteratee is to "provide strict [...] I/O" http://hackage.haskell.org/package/iteratee-0.8.9.4 J.W.
participants (2)
-
jeff p
-
Johannes Waldmann