
On Sun, 2010-01-03 at 17:34 +0100, Maciej Piechotka wrote:
I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file.
I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock:
If I add:
pipeline4 = do file <- oneToOneChannel' $ chanLabel "File" data_ <- oneToOneChannel' $ chanLabel "Data" compressed <- oneToOneChannel' $ chanLabel "Compressed" runParallel_ [getFiles (writer file), readFromFile (reader file) (writer data_), compressCHP (reader data_) (writer compressed), CHP.consume (reader compressed)]
And change compress to(I'm not tested without change but here I omit explicit interleave):
stateM :: Monad m => a -> (a -> m a) -> m b stateM i f = f i >>= flip stateM f
Like forever but with state
chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a)), WriteableChannel w, Poisonable (w [a])) => r (Maybe a) -> w [a] -> CHP () chanMaybe2List in_ out = do chan <- liftIO $ newChan list <- liftIO ((Just Nothing :) <$> getChanContents chan) runParallel_ [forever (readChannel in_ >>= liftIO . writeChan chan . Just) `onPoisonRethrow` (liftIO (writeChan chan Nothing) >> poison in_), forever $ stateM list process] where process (Nothing :_) = poison out >> throwPoison process (Just Nothing:Nothing:_) = poison out >> throwPoison process (Just Nothing:xs) = let (this, that) = span isJust xs isJust = maybe False (maybe False (const True)) this' = map fromJust (map fromJust this) in writeChannel out (map fromJust $ map fromJust this) >> process that
Writes to output lazy list of all elements in input
compressCHP' :: (ReadableChannel r, Poisonable (r [BS.ByteString]), WriteableChannel w, Poisonable (w [BS.ByteString])) => r [BS.ByteString] -> w [BS.ByteString] -> CHP () compressCHP' in_ out = forever (writeChannel out . LBS.toChunks . compress . LBS.fromChunks =<< readChannel in_) `onPoisonRethrow` (poison in_ >> poison out)
Compresses the lists of chunks
toMaybeList :: (ReadableChannel r, Poisonable (r [a]), WriteableChannel w, Poisonable (w (Maybe a))) => r [a] -> w (Maybe a) -> CHP () toMaybeList in_ out = forever (readChannel in_ >>= mapM_ (writeChannel out . Just) >> writeChannel out Nothing) `onPoisonRethrow` (poison in_ >> poison out)
Converts back to list
compressCHP :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r (Maybe BS.ByteString) -> w (Maybe BS.ByteString) -> CHP () compressCHP = chanMaybe2List |->| compressCHP' |->| toMaybeList
Combines all 3 operations. However pipeline3 still results in deadlock: Just "Test1\n" (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, _b4, File GZ."test1.gz", _c5, _b6, _c7, _b3 > Well - at least I know where there is no problem. Regardss