
Hi, I have run across the following idiom several times in the last few weeks: Some part of my program reads input from a 'Handle' and repeatedly calls a "stream processor" to incrementally compute some result, say an SHA1 hash, for example. All my stream processors can be reduced to the following interface: import Foreign ( Ptr, Word8 ) type Buffer = (Ptr Word8, Int) data StreamProc ctx a = SP { start :: IO ctx , feed :: ctx -> Buffer -> IO ctx , commit :: ctx -> IO a } I have to initialize some SP-specific context, then I can feed the data into a stateful computation as it comes in, and after receiving EOF I finalize the SP to obtain the result I wanted. The StreamProc interface works nicely for all kind of things, be it message digests, encryption, encoding, or parsers. And obviously, it's a rather generic concept. One that doesn't have to be re-invented every time ... Which leads to my question: Is there any generic module available that implements such a SP? Or something similar? I know the stream processors as described in Hughes' paper about Arrows, but those are pure stream processors -- they don't allow for I/O, which I need to handle the Ptr. Does anyone have a recommendation for me? Peter

Peter Simons wrote:
type Buffer = (Ptr Word8, Int)
data StreamProc ctx a = SP { start :: IO ctx , feed :: ctx -> Buffer -> IO ctx , commit :: ctx -> IO a }
Must contexts be used in a single-threaded manner? If so, I would expect this interface: start :: IO ctx feed :: ctx -> Buffer -> IO () commit :: ctx -> IO a If not, I would expect this interface: start :: ctx feed :: ctx -> Buffer -> IO ctx commit :: ctx -> a Additionally, I don't think (Ptr Word8, Int) is general enough for all reasonable uses of this interface. For example, there's nothing inherently unsafe about calculating the MD5 hash of the contents of an STUArray or UArray: feedBuffer :: ctx -> Buffer -> IO ctx feedSTUArray :: ctx -> STUArray s Int Word8 -> ST s ctx feedUArray :: ctx -> UArray Int Word8 -> ctx And what about a subrange of an STUArray or UArray? These are the problems I ran into when trying to produce a useful MD5 library for Haskell. I feel the same way as you -- that there should be a standard way of doing this -- but I don't think the three functions you propose are nearly enough. -- Ben

Ben Rudiak-Gould writes:
Must contexts be used in a single-threaded manner? If so, I would expect this interface:
start :: IO ctx feed :: ctx -> Buffer -> IO () commit :: ctx -> IO a
'feed' cannot have this signature because it needs to update the context.
If not, I would expect this interface:
start :: ctx feed :: ctx -> Buffer -> IO ctx commit :: ctx -> a
Both 'start' and 'commit' need to be in the IO monad, because creating and finalizing the context may involve IO calls. (Just think of a computation that does internal buffering in memory which is accessed through another Ptr.)
Additionally, I don't think (Ptr Word8, Int) is general enough for all reasonable uses of this interface.
That's true. I used this because I have the data in a memory buffer already, so this is the API with the best performance for me because it handles the input without marshaling. I agree it would be nice to have something more general and generally smarter than the my type definition -- that's why I asked on the list. :-)
feedBuffer :: ctx -> Buffer -> IO ctx feedSTUArray :: ctx -> STUArray s Int Word8 -> ST s ctx feedUArray :: ctx -> UArray Int Word8 -> ctx
I would implement feedSTUArray and friends as wrappers around the Ptr interface, not as primitive computations of the stream processor. But nonetheless, having those would be nice. Peter

Peter Simons wrote:
Ben Rudiak-Gould writes:
Must contexts be used in a single-threaded manner? If so, I would expect this interface:
start :: IO ctx feed :: ctx -> Buffer -> IO () commit :: ctx -> IO a
'feed' cannot have this signature because it needs to update the context.
Sure it can -- it's just like writeIORef :: IORef a -> a -> IO (). If the return of writeIORef were IO (IORef a) instead, it would be confusing: does it return the same IORef or a different one? If a different one, does the original one remain unchanged? If the same one, why bother returning it when the caller already had it? That's what confused me about your proposed interface.
If not, I would expect this interface:
start :: ctx feed :: ctx -> Buffer -> IO ctx commit :: ctx -> a
Both 'start' and 'commit' need to be in the IO monad, because creating and finalizing the context may involve IO calls. (Just think of a computation that does internal buffering in memory which is accessed through another Ptr.)
In this interface contexts are supposed to be immutable Haskell values, so there's no meaning in creating new ones or finalizing old ones. The initial empty context is just a value, and the final MD5 hash (or whatever) is a pure function of the final context. Yes, this would likely involve internal use of unsafePerformIO in the implementation, but that's what it's there for (a required part of the FFI). Dealing with reuse of state might also make the library too inefficient in practice, which would be a bigger problem.
feedBuffer :: ctx -> Buffer -> IO ctx feedSTUArray :: ctx -> STUArray s Int Word8 -> ST s ctx feedUArray :: ctx -> UArray Int Word8 -> ctx
I would implement feedSTUArray and friends as wrappers around the Ptr interface, not as primitive computations of the stream processor.
I think it's impossible to do this safely, but it would be great if I were wrong. -- Ben

Ben Rudiak-Gould writes:
start :: IO ctx feed :: ctx -> Buffer -> IO () commit :: ctx -> IO a
'feed' cannot have this signature because it needs to update the context.
Sure it can -- it's just like writeIORef :: IORef a -> a -> IO ().
I guess it's mood to argue that point. I don't want a stream processor to have a global state, so using an internally encapsulated IORef is not an option for me. I am looking for an more _general_ API, not one that forces implementation details on the stream processor. That's what my StreamProc data type does already. :-)
start :: ctx feed :: ctx -> Buffer -> IO ctx commit :: ctx -> a
In this interface contexts are supposed to be immutable Haskell values, so there's no meaning in creating new ones or finalizing old ones.
I don't want to restrict the API to immutable contexts. A context could be anything, _including_ an IORef or an MVar. But the API shouldn't enforce that.
I would implement feedSTUArray and friends as wrappers around the Ptr interface, not as primitive computations of the stream processor.
I think it's impossible to do this safely, but it would be great if I were wrong.
wrap :: (Storable a, MArray arr a IO) => Ptr a -> Int -> IO (arr Int a) wrap ptr n = peekArray n ptr >>= newListArray (0,n) Peter

Peter Simons wrote:
Ben Rudiak-Gould writes:
start :: IO ctx feed :: ctx -> Buffer -> IO () commit :: ctx -> IO a
'feed' cannot have this signature because it needs to update the context.
Sure it can -- it's just like writeIORef :: IORef a -> a -> IO ().
I guess it's mood to argue that point. I don't want a stream processor to have a global state, so using an internally encapsulated IORef is not an option for me.
I am looking for an more _general_ API, not one that forces implementation details on the stream processor. That's what my StreamProc data type does already. :-)
I'm not arguing about generality; I simply don't understand how your interface is supposed to be used. E.g.: do ctx <- start ctx1 <- feed ctx array1 ctx2 <- feed ctx array2 val1 <- commit ctx1 val2 <- commit ctx2 return (val1,val2) Should this return (MD5 of array1, MD5 of array2), or (MD5 of array1+array2, MD5 of array1+array2), or cause a runtime error? Any of these three might be reasonable, but for your interface to be well-defined you need to stipulate which one is correct. Once you're decided which one is correct, there's no reason not to change the interface so that no one can misinterpret it. My two interfaces are only less general than yours in that they don't have multiple interpretations -- which is a good thing.
start :: ctx feed :: ctx -> Buffer -> IO ctx commit :: ctx -> a
In this interface contexts are supposed to be immutable Haskell values, so there's no meaning in creating new ones or finalizing old ones.
I don't want to restrict the API to immutable contexts. A context could be anything, _including_ an IORef or an MVar. But the API shouldn't enforce that.
It doesn't. Even (length :: [a] -> Int) is likely to cause destructive updating of thunks when it's called, but that's not a reason to change the interface to [a] -> IO Int. The important thing is whether, from the caller's perspective, the function is pure. If it's pure, it shouldn't be in the IO monad, even if that forces some implementations to use unsafePerformIO under the hood. I think you're hoping to have it both ways, capturing destructive- update semantics and value semantics in a single interface. That's not going to work, unfortunately. You must decide whether to enforce single-threading or not.
I would implement feedSTUArray and friends as wrappers around the Ptr interface, not as primitive computations of the stream processor.
I think it's impossible to do this safely, but it would be great if I were wrong.
wrap :: (Storable a, MArray arr a IO) => Ptr a -> Int -> IO (arr Int a) wrap ptr n = peekArray n ptr >>= newListArray (0,n)
Isn't this going in the wrong direction? I think what we want is something like withArrayPtr :: (MArray arr Word8 IO) => arr i Word8 -> (Ptr Word8 -> IO a) -> IO a You're right, though, this can be written safely: withArrayPtr arr act = getElems arr >>= flip withArray act It's terribly slow, though. Ideally one wants a pointer into the original array together with a guarantee that it won't be moved by the garbage collector during the execution of your IO action. I think current versions of GHC will never move the array if your IO action performs no heap allocation, but I can easily imagine that changing in other/future implementations. I suppose you could also have withArrayPtrM :: (MArray arr Word8 m, Ix i) => arr i Word8 -> (Ptr Word8 -> m b) -> m b withArrayPtrI :: (IArray arr a, Ix i) => arr i Word8 -> (Ptr Word8 -> IO b) -> IO b though I'm not sure how much sense those types (or names) make. The first one would force the use of unsafeIOToST if you wanted to use it with ST arrays, but probably that's unavoidable. -- Ben

Ben Rudiak-Gould writes:
I'm not arguing about generality; I simply don't understand how your interface is supposed to be used. E.g.:
do ctx <- start ctx1 <- feed ctx array1 [...]
Note my original definition: type Buffer = (Ptr Word8, Int) data StreamProc ctx a = SP { start :: IO ctx , feed :: ctx -> Buffer -> IO ctx , commit :: ctx -> IO a } So you would use it like this: foo :: StreamProc ctx a -> IO a foo sp = do ctx <- start sp (ptr,n) <- "read buffer" ctx' <- feed sp ctx (ptr,n) ... commit sp ctx'
The important thing is whether, from the caller's perspective, the function is pure. If it's pure, it shouldn't be in the IO monad [...]
My stream processors are not pure.
I think you're hoping to have it both ways, capturing destructive- update semantics and value semantics in a single interface. [...] You must decide whether to enforce single-threading or not.
Uh ... I am genuinely uncertain what single-threading versus multi-threading has to do with my API problem. The API should allow _both_, obviously, and I think it does. What I am trying to find is an _abstract_ API that has the same properties but doesn't depend on a specific data type. Right now I have the stream processor version of 'StateT', but I want the stream processor version of 'MonadState m' instead. Peter

Peter Simons wrote:
Note my original definition:
type Buffer = (Ptr Word8, Int)
data StreamProc ctx a = SP { start :: IO ctx , feed :: ctx -> Buffer -> IO ctx , commit :: ctx -> IO a }
So you would use it like this:
foo :: StreamProc ctx a -> IO a foo sp = do ctx <- start sp (ptr,n) <- "read buffer" ctx' <- feed sp ctx (ptr,n) ... commit sp ctx'
Sorry, my mistake. I was thinking of StreamProc as a type class. What I meant was this: bar :: StreamProc ctx a -> IO (a,a) bar sp = do ctx <- start sp (ptr1,n1) <- ... (ptr2,n2) <- ... ctx1 <- feed sp ctx (ptr1,n1) ctx2 <- feed sp ctx (ptr2,n2) val1 <- commit sp ctx1 val2 <- commit sp ctx2 return (val1,val2) My point is just that bar typechecks and therefore must do something at runtime; what does it do? This is a genuine question -- I'm hoping the answer will help me understand what you're trying to do here. -- Ben

Ben Rudiak-Gould writes:
bar :: StreamProc ctx a -> IO (a,a) bar sp = do ctx <- start sp (ptr1,n1) <- ... (ptr2,n2) <- ... ctx1 <- feed sp ctx (ptr1,n1) ctx2 <- feed sp ctx (ptr2,n2) val1 <- commit sp ctx1 val2 <- commit sp ctx2 return (val1,val2)
Ah! Now I understand what you meant with single-threaded versus multi-threaded use of the stream processor. Well, in the general case the result would be undefined, because not every stream processor allows a context to be re-used. The SHA1 implementation I use, for example, has a context of Ptr Sha1Context ..., so both val1 and val2 would be the hash of the concatenation of both buffers. Which is not what you'd expect. A different implementation, on the other hand, might give you the hash of the first and second block respectively. Hmmm. So I have those options: (1) If you want to use _any_ stream processor, you must use it single-threadedly. If you use it multi-threadedly, you have to know what you're doing. (2) Have distinct types (or classes) for stream processors that allow the context to be re-used and for those which do not. So far, I've implicitly used (1). Peter

On Thu, 21 Oct 2004, Peter Simons wrote:
(1) If you want to use _any_ stream processor, you must use it single-threadedly. If you use it multi-threadedly, you have to know what you're doing.
(2) Have distinct types (or classes) for stream processors that allow the context to be re-used and for those which do not.
(3) As per (2), but with one as a subclass of the other (cf Arrows and Monads, er, ArrowApply) -- flippa@flippac.org

At 21 Oct 2004 16:48:57 +0200, Peter Simons wrote:
Hi,
I know the stream processors as described in Hughes' paper about Arrows, but those are pure stream processors -- they don't allow for I/O, which I need to handle the Ptr.
Here is a some code I scraped off the net a while ago, though I can't seem to find the origin anymore. Not sure what the license is. Also, I think it might be a little buggy -- I have not used it much... However, it does show how to do IO in stream processing arrows... module ArrowStream where ----------------------------------------------------------------------------- -- -- Definition of continuation based stream processor as an arrow -- -- Reference: Magnus Carlsson, Thomas Hallgren, "Fudgets--Purely Functional -- Processes with applications to Graphical User Interfaces", -- Department of Computing Science, Chalmers University of -- Technology, Goteborg University, Dissertation 1998 -- -- John Hughes, Generalising Monads to Arrows, November 10, 1998 -- -- History: 14-Aug-2002 Shawn Garbett, Creation -- 01-Apr-2004 A -- ------------------------------------------------------------------------------- import Char import Control.Arrow import Control.Concurrent import Monad import System.IO data SP i o = Put o (SP i o) | Get (i -> SP i o) | Null | DoIO (IO (SP i o)) instance Arrow SP where arr f = Get (\x -> Put (f x) (arr f)) sp1 >>> Put c sp2 = Put c (sp1 >>> sp2) Put b sp1 >>> Get f = sp1 >>> f b Get f1 >>> Get f2 = Get (\a -> f1 a >>> Get f2) _ >>> Null = Null Null >>> Get _ = Null -- Process io downstream first sp >>> DoIO io = DoIO (Monad.liftM (sp >>>) io) -- Process io upstream next DoIO io >>> sp = DoIO (Monad.liftM (>>> sp) io) first f = bypass [] f where bypass ds (Get f) = Get (\(b,d) -> bypass (ds++[d]) (f b)) bypass (d:ds) (Put c sp) = Put (c,d) (bypass ds sp) bypass [] (Put c sp) = Get (\(b,d) -> Put (c,d) (bypass [] sp)) -- making it up... bypass ds (DoIO iosp) = DoIO (iosp >>= (\sp -> return (bypass ds sp))) instance ArrowZero SP where zeroArrow = Get (\x -> zeroArrow) instance ArrowPlus SP where Put b sp1 <+> sp2 = Put b (sp1 <+> sp2) sp1 <+> Put b sp2 = Put b (sp1 <+> sp2) Get f1 <+> Get f2 = Get (\a -> f1 a <+> f2 a) sp1 <+> (DoIO ioSP) = DoIO (ioSP >>= (\sp2 -> return (sp1 <+> sp2))) (DoIO ioSP) <+> sp2 = DoIO (ioSP >>= (\sp1 -> return (sp1 <+> sp2))) sp1 <+> Null = sp1 Null <+> sp2 = sp2 instance ArrowChoice SP where left (Put c sp) = Put (Left c) (left sp) left (Get f) = Get (\z -> case z of Left a -> left (f a) Right b -> Put (Right b) (left (Get f))) left (DoIO iosp) = DoIO (iosp >>= return . left) -- | Run the IO in a DoIO -- | the putStrLn's are just for debug... spIO :: Show o => SP i o -> IO () spIO sp = case sp of Null -> putStrLn "Null" >> return () Get _ -> putStrLn ("Get f") >> return () Put n sp' -> putStrLn ("Put " ++ (show n)) >> spIO sp' DoIO io -> io >>= spIO

Jeremy Shaw writes:
Here is a some code I scraped off the net a while ago, though I can't seem to find the origin anymore.
I _think_ it is from part of Fudgets library: http://www.cs.chalmers.se/Cs/Research/Functional/Fudgets/ Thanks for posting it, though, this implementation is rather neat. Now I merely need to think of a way to apply this code to my problem. :-) Perhaps some more paper-reading is in order ... Peter
participants (4)
-
Ben Rudiak-Gould
-
Jeremy Shaw
-
Peter Simons
-
Philippa Cowderoy