
[ moving to libraries@haskell.org from glasgow-haskell-users@haskell.org ] Bulat Ziganshin wrote:
Wednesday, April 19, 2006, 4:45:19 PM, you wrote:
Believe me I've looked in detail at your streams library. Performance-wise it is great but the design needs to be reworked IMO.
The main problem is that it doesn't have enough type structure. There are many combinations of stream transformers that don't make sense, and should therefore be ruled out by the type system. There are operations that don't work on some streams. There should at the least be a type distinction between directly accessible memory streams, byte streams, and text streams. Additionally I would add separate classes for seekable and buffered streams. I believe these changes would improve performance by reducing the size of dictionaries.
you have written this in February, but this discussion was not finished due to my laziness. now i tried to split Stream interface to several parts. so
I've attached a sketched design. It doesn't compile, but it illustrates the structure I have in mind. The main improvement since the new-io library is the addition of memory streams. This is an idea from your library and I like it a lot, although I changed the type of the methods: -- | An input stream accessed directly via a memory buffer. -- Ordinary 'InputStream's may be converted to 'MemInputStream's by -- adding buffering; see 'bufferInputStream'. class MemInputStream s where -- | Consume some bytes from the memory stream. The second argument -- is an IO action that is passed a buffer and its size (the size must -- be non-zero), and it should return the number of bytes consumed. withStreamInputBuffer :: s -> (Ptr Word8 -> Int -> IO Int) -> IO () -- | An output stream accessed directly via a memory buffer. -- Ordinary 'OutputStream's may be converted to 'MemOutputStream's by -- adding buffering; see 'bufferOutputStream'. class MemOutputStream s where -- | Write some bytes to a memory stream. The second argument -- is an IO action that is passed a buffer and its size (the size must -- be non-zero), and it should return the number of bytes written. withStreamOutputBuffer :: s -> (Ptr Word8 -> Int -> IO Int) -> IO ()
1) that you think - Stream should be base for all other stream classes or each Stream class should be independent? i.e.
Superclasses aren't necessary, but they might help to reduce the size of contexts in practice. We should probably experiment with both.
2) separation of Stream classes make some automatic definitions impossible. for example, released version contains vGetBuf implementation that is defined via vGetChar and works ok for streams that provide only vGetChar as base function.
No class should implement both reading bytes and reading chars. Encoding/decoding should be a stream transformer that turns a byte stream into a text stream. So there's no duplication of these methods. I believe splitting up the classes should lead to less duplication, not more, partly because you don't have to implement a lot of methods that don't do anything or are errors (eg. writing to an input stream). I admit I haven't actually written all the code, though.
3) the problems are substantially growed now - when i tried to separate input and output streams (the same will apply to detaching of seekable streams into the separate class). the problem is what i need either to provide 2 or 3 separate implementations for buffering of read-only, write-only and read-write streams or have some universal definition that should work even when base Stream don't provide part of operations. the last seems to be impossible - may be i don't understand enough Haskell's class system?
let's see:
data BufferedStream h = Buf h ....
vClose (Buf h ...) = vPutBuf ... - flush buffer's contents
how i can implement this if `h` may not support vPutBuf operation? especially to allow read/write streams to work???
You can only buffer a byte stream. See my sketch design.
4) what you mean by "There are many combinations of stream transformers that don't make sense" ? splitting Stream class to the BlockStream/TextStream/ByteStream or something else?
Yes - adding decoding to a TextStream doesn't make sense. Directly accessing the memory of a byte stream doesn't make sense: you need to buffer it first, or use a memory-mapped stream. It is still possible to implement read/write files using this structure. There's nothing stopping you having an type that is an instance of both InputStream and OutputStream (eg. a read/write file), and layering buffering on top of this would yield a buffered input/output stream in which the buffer contains only input or output data. Cheers, Simon {-# OPTIONS -fglasgow-exts -cpp #-} ----------------------------------------------------------------------------- -- | -- Module : System.IO.Stream -- Copyright : (c) various -- License : see libraries/base/LICENSE -- -- Maintainer : simonmar@microsoft.com -- Stability : experimental -- Portability : non-portable (existentials, ghc extensions) -- -- InputStreams and OutputStreams are classes of objects which support -- input and output respectively. Streams can be layered on top of various -- underlying I/O objects (such as files or sockets). Stream transformers -- can be applied to turn streams of one type into streams of another type. -- ----------------------------------------------------------------------------- module System.IO.Stream ( -- * Streams {-class-} Stream(..), {-class-} InputStream(..), {-class-} OutputStream(..), streamGet, streamReadBuffer, streamPut, streamWriteBuffer, -- * Stream connections PipeInputStream, PipeOutputStream, streamPipe, streamConnect, -- * Memory streams {-class-} MemInputStream, {-class-} MemOutputStream, -- ** Converting memory streams to I/O streams MemToInputStream, memToInputStream, MemToOutputStream, memToOutputStream, -- * Buffering BufferMode(..), BufferedInputStream, bufferIntputStream, BufferedOutputStream, bufferOutputStream, ) where import System.IO.Buffer import Foreign import Data.Word ( Word8 ) import System.IO ( BufferMode(..) ) import System.IO.Error ( mkIOError, eofErrorType ) import Control.Exception ( assert ) import Control.Monad ( when, liftM ) import Control.Concurrent import Data.IORef import GHC.Exts import GHC.Ptr ( Ptr(..) ) import GHC.IOBase ( IO(..), ioException ) import GHC.Handle ( ioe_EOF ) #define UPK {-# UNPACK #-} ! -- ----------------------------------------------------------------------------- -- Streams class Stream s where -- | closes a stream closeStream :: s -> IO () -- | returns 'True' if the stream is open streamIsOpen :: s -> IO Bool -- | ToDo: objections have been raised about this method, and -- are still to be resolved. It doesn't make as much sense -- for output streams as it does for input streams. streamIsEOS :: s -> IO Bool -- | Returns 'True' if there is data available to read from this -- stream. Returns 'False' if either there is no data available, or -- the end of the stream has been reached. streamReady :: s -> IO Bool -- | Returns the number of bytes that can be transfered to/from -- this stream, if known. streamRemaining :: s -> IO (Maybe Integer) -- | An 'InputStream' is a basic I/O object which supports reading a -- stream of 'Word8's. It is expected that 'InputStream's are unbuffered: -- buffering is layered on top of one of these. class Stream s => InputStream s where -- | Grabs data without blocking, but only if there is data available. -- If there is none, then waits for some. This function may only -- return zero if either the requested length is zero or the end of stream -- has been reached. streamReadBufferNonBlocking :: s -> Integer -> Ptr Word8 -> IO Integer -- | Reads a single 'Word8' from a stream. streamGet :: InputStream s => s -> IO Word8 streamGet s = alloca $ \p -> do r <- streamReadBufferNonBlocking s 1 p if r == 0 then ioe_EOF else peek p -- | Reads data from the stream into a 'Buffer'. Returns the -- number of elements that were read, which may only be less than the -- requested length if the end of the stream was reached. streamReadBuffer :: InputStream s => s -> Integer -> Ptr Word8 -> IO Integer streamReadBuffer s 0 buf = return 0 streamReadBuffer s len ptr = streamReadBufferLoop s ptr 0 (fromIntegral len) streamReadBufferLoop :: InputStream s => s -> Ptr Word8 -> Integer -> Integer -> IO Integer streamReadBufferLoop s ptr off len = do r <- streamReadBufferNonBlocking s len ptr if r == 0 then return (fromIntegral off) else if (r < len) then streamReadBufferLoop s (ptr `plusPtr` fromIntegral r) (off+r) (len-r) else return (off+r) -- ----------------------------------------------------------------------------- -- Output streams class Stream s => OutputStream s where -- | Writes data to an output stream. It will write at least one -- byte, but will only write further bytes if it can do so without -- blocking. -- -- The result may never be 0 if the requested write size was > 0. -- If no bytes can be written to the stream, then the -- 'streamWriteBufferNonBlocking' should raise an exception -- indicating the cause of the problem (eg. the stream is closed). streamWriteBufferNonBlocking :: s -> Integer -> Ptr Word8 -> IO Integer -- | Writes a single byte to an output stream. streamPut :: OutputStream s => s -> Word8 -> IO () streamPut s word = with word $ \p -> streamWriteBuffer s 1 p -- | Writes data to a stream, only returns when all the data has been -- written. streamWriteBuffer :: OutputStream s => s -> Integer -> Ptr Word8 -> IO () streamWriteBuffer s 0 ptr = return () streamWriteBuffer s len ptr = streamWriteBufferLoop s ptr 0 len streamWriteBufferLoop :: OutputStream s => s -> Ptr Word8 -> Integer -> Integer -> IO () streamWriteBufferLoop s ptr off len = seq off $ -- strictness hack if len == 0 then return () else do r <- streamWriteBufferNonBlocking s len ptr assert (r /= 0) $ do if (r < len) then streamWriteBufferLoop s (ptr `plusPtr` fromIntegral r) (off+r) (len-r) else return () -- ---------------------------------------------------------------------------- -- Connecting streams data PipeInputStream data PipeOutputStream instance Stream PipeInputStream -- ToDo instance InputStream PipeInputStream -- ToDo instance Stream PipeOutputStream -- ToDo instance OutputStream PipeOutputStream -- ToDo streamPipe :: IO (PipeInputStream, PipeOutputStream) streamPipe = error "unimplemented: streamOutputToInput" -- | Takes an output stream and an input stream, and pipes all the -- data from the former into the latter. streamConnect :: (OutputStream o, InputStream i) => o -> i -> IO () streamConnect = error "unimplemented: streamInputToOutput" -- ---------------------------------------------------------------------------- -- Memory streams -- | An input stream accessed directly via a memory buffer. -- Ordinary 'InputStream's may be converted to 'MemInputStream's by -- adding buffering; see 'bufferInputStream'. class MemInputStream s where -- | Consume some bytes from the memory stream. The second argument -- is an IO action that is passed a buffer and its size (the size must -- be non-zero), and it should return the number of bytes consumed. withStreamInputBuffer :: s -> (Ptr Word8 -> Int -> IO Int) -> IO () -- | An output stream accessed directly via a memory buffer. -- Ordinary 'OutputStream's may be converted to 'MemOutputStream's by -- adding buffering; see 'bufferOutputStream'. class MemOutputStream s where -- | Write some bytes to a memory stream. The second argument -- is an IO action that is passed a buffer and its size (the size must -- be non-zero), and it should return the number of bytes written. withStreamOutputBuffer :: s -> (Ptr Word8 -> Int -> IO Int) -> IO () -- ----------------------------------------------------------------------------- -- A memory stream can be converted to an ordinary byte stream newtype MemToInputStream s = MemToInputStream s deriving (Stream, MemInputStream) newtype MemToOutputStream s = MemToOutputStream s deriving (Stream, MemOutputStream) -- Rationale: what we really want is -- instance MemInputStream s => InputStream s -- but that overlaps. So instead we provide a way to convert -- every MemInputStream into something that is an instance of -- InputStream. instance InputStream (MemToInputStream s) where instance OutputStream (MemToOutputStream s) where memToInputStream :: MemInputStream s => MemToInputStream s memToInputStream = MemToInputStream memToOutputStream :: MemOutputStream s => MemToOutputStream s memToOutputStream = MemToOutputStream -- ----------------------------------------------------------------------------- -- Buffering -- | Operations on a stream with a buffer class Buffered s where -- | Sets the buffering mode on the stream. Returns 'True' if -- the buffereing mode was set, or 'False' if it wasn't. If the -- stream does not support buffering, it may return 'False'. setBufferMode :: s -> BufferMode -> IO Bool -- | Returns the current buffering mode for a stream. On a -- stream that does not support buffering, the result will always -- be 'NoBuffering'. getBufferMode :: s -> IO BufferMode -- | Returns the number of bytes of data in the buffer countBufferedBytes :: s -> IO Int -- | Operations on an output stream with a buffer class OutputBuffered s where -- | Flushes the buffer to the operating system flush :: s -> IO () -- | Flushes the buffered data as far as possible, even to the -- physical media if it can. It returns 'True' if the data -- has definitely been flushed as far as it can go: to the -- disk for a disk file, to the screen for a terminal, and so on. sync :: s -> IO Bool -- | Operations on an input stream with a buffer class InputBuffered s where -- | Discards the input buffer discard :: s -> IO () -- | Pushes back the buffered data, if possible. Returns 'True' if -- the buffer could be pushed back, 'False' otherwise. pushback :: s -> IO Bool -- TODO: Seekable superclass allows pushback? -- | An 'InputStream' with buffering added data BufferedInputStream s = BufferedInputStream s BufferMode !(IORef (Buffer Word8)) -- | An 'OutputStream' with buffering added data BufferedOutputStream s = BufferedOutputStream s BufferMode !(IORef (Buffer Word8)) bufferSize :: BufferMode -> Int bufferSize (BlockBuffering (Just size)) = size bufferSize _ = dEFAULT_BUFFER_SIZE -- | Add buffering to an 'InputStream' bufferInputStream :: InputStream s => s -> BufferMode -> BufferedInputStream s bufferInputStream stream bmode = do buffer <- allocateBuffer (bufferSize bmode) r <- newIORef buffer return (BufferedInputStream stream bmode r) -- | Add buffering to an 'OutputStream' bufferOutputStream :: OutputStream s => s -> BufferedOutputStream s bufferOutputStream stream bmode = do buffer <- allocateBuffer (bufferSize bmode) r <- newIORef buffer return (BufferedOutputStream stream bmode r) instance Stream s => MemInputStream (BufferedInputStream s) where withStreamInputBuffer b@(BufferedInputStream s bmode ref) action = do buffer <- readIORef ref if emptyBuffer buffer then do buffer' <- fillReadBuffer s buffer writeIORef ref buffer' withStreamInputBuffer b action else do let used = bufferUsed buffer count <- withBuffer buffer $ \ptr -> action (ptr `plusPtr` bufRPtr buffer) used let buffer' = bufferRemove (min used count) checkBufferInvariants buffer' writeIORef ref $! buffer' instance Stream s => MemOutputStream (BufferedOutputStream s) where withStreamOutputBuffer b@(BufferedOutputStream s bmode ref) action = do buffer <- readIORef ref let avail = bufferAvailable buffer count <- withBuffer buffer $ \ptr -> action (ptr `plusPtr` bufWPtr buffer) avail let buffer' = bufferAdd (max count avail) if fullBuffer buffer' then do writeBuffer s buffer; writeIORef ref (emptyBuffer buffer') else writeIORef ref buffer' instance Stream s => Stream (BufferedInputStream s) instance Stream s => Stream (BufferedOutputStream s) instance Buffered (BufferedOutputStream s) instance OutputBuffered (BufferedOutputStream s) instance Buffered (BufferedInputStream s) instance InputBuffered (BufferedInputStream s)