[Very long] (CHP?) Compressing, MD5 and big files

I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file. I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock:
{-# LANGUAGE GADTs #-} import Codec.Compression.GZip import Control.Applicative import Control.Concurrent.CHP import qualified Control.Concurrent.CHP.Common as CHP import Control.Concurrent.CHP.Enroll import Control.Concurrent.CHP.Utils import Control.Monad.State.Strict import Data.Digest.Pure.MD5 import Data.Maybe import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.ByteString.Lazy.Internal as LBS import System.Environment import System.IO import System.IO.Unsafe
calculateMD5 :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w MD5Digest)) => r (Maybe BS.ByteString) -> w MD5Digest -> CHP () calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext `onPoisonRethrow` (poison in_ >> poison out) where loop = liftCHP (readChannel in_) >>= calc' calc' Nothing = gets md5Finalize >>= liftCHP . writeChannel out >> put md5InitialContext calc' (Just b) = modify (flip md5Update $ LBS.fromChunks [b])
Calculate MD5 hash of input stream. Nothing indicates EOF.
unsafeInterleaveCHP :: CHP a -> CHP a unsafeInterleaveCHP = fromJust <.> liftIO <=< unsafeInterleaveIO <.> embedCHP
Helper function. It is suppose to move the execution in time - just as unsafeInterleaveIO. I belive that the main problem lives here. Especially that Maybe.fromJust: Nothing is the error.
chan2List :: (ReadableChannel r, Poisonable (r a)) => r a -> CHP [a] chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_) (chan2List in_)) `onPoisonTrap` return [])
Changes channel to lazy read list.
chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a))) => r (Maybe a) -> CHP [[a]] chanMaybe2List in_ = splitByMaybe <$> chan2List where splitByMaybe [] = [] splitByMaybe (Nothing:xs) = []:splitByMaybe xs splitByMaybe (Just v :[]) = [[v]] splitByMaybe (Just v :xs) = let (y:ys) = splitByMaybe xs in (v:y):ys
Reads lazyly from channel o list of list
compressCHP :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r (Maybe BS.ByteString) -> w (Maybe BS.ByteString) -> CHP () compressCHP in_ out = toOut >>= mapM_ sendBS where in_' :: CHP [LBS.ByteString] in_' = fmap LBS.fromChunks <$> chanMaybe2List in_ toOut :: CHP [LBS.ByteString] toOut = fmap compress <$> in_' sendBS :: LBS.ByteString -> CHP () sendBS LBS.Empty = writeChannel out Nothing sendBS (LBS.Chunk c r) = writeChannel out (Just c) >> sendBS r
Compress process
readFromFile :: (ReadableChannel r, Poisonable (r String), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r String -> w (Maybe BS.ByteString) -> CHP () readFromFile file data_ = forever (do path <- readChannel file hnd <- liftIO $ openFile path ReadMode let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>= writeChannel data_ . Just copy `onPoisonRethrow` liftIO (hClose hnd) writeChannel data_ Nothing liftIO $ hClose hnd) `onPoisonRethrow` (poison file >> poison data_)
Process reading from file
writeToFile :: (ReadableChannel r, Poisonable (r String), ReadableChannel r', Poisonable (r' (Maybe BS.ByteString))) => r String -> r' (Maybe BS.ByteString) -> CHP () writeToFile file data_ = forever (do path <- readChannel file hnd <- liftIO $ openFile path WriteMode let writeUntilNothing = readChannel data_ >>= writeUntilNothing' writeUntilNothing' Nothing = return () writeUntilNothing' (Just v) = liftIO (BS.hPutStr hnd v) >> writeUntilNothing writeUntilNothing `onPoisonFinally` liftIO (hClose hnd)) `onPoisonRethrow` (poison file >> poison data_)
Process writing to file
getFiles :: (WriteableChannel w, Poisonable (w String)) => w String -> CHP () getFiles out = mapM_ (writeChannel out) ["test1", "test2"] >> poison (out)
Sample files. Each contains "Test1\n"
pipeline1 :: CHP () pipeline1 = do md5sum <- oneToOneChannel' $ chanLabel "MD5" runParallel_ [(getFiles ->|^ ("File", readFromFile) ->|^ ("Data", calculateMD5)) (writer md5sum), forever $ readChannel (reader md5sum) >>= liftIO . print]
First pipeline. Output: fa029a7f2a3ca5a03fe682d3b77c7f0d fa029a7f2a3ca5a03fe682d3b77c7f0d < File."test1", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File."test2", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d >
pipeline2 :: CHP () pipeline2 = enrolling $ do file <- oneToManyChannel' $ chanLabel "File" fileMD5 <- oneToOneChannel' $ chanLabel "File MD5" data_ <- oneToOneChannel' $ chanLabel "Data" md5 <- oneToOneChannel' $ chanLabel "MD5" md5BS <- oneToOneChannel' $ chanLabel "MD5 ByteString" fileMD5' <- Enroll (reader file) fileData <- Enroll (reader file) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileMD5' >>= writeChannel (writer fileMD5) . (++".md5")) `onPoisonRethrow` (poison fileMD5' >> poison (writer fileMD5)), readFromFile fileData (writer data_), calculateMD5 (reader data_) (writer md5), (forever $ do v <- readChannel (reader md5) let v' = Just $ BS.pack $ show v writeChannel (writer md5BS) v' writeChannel (writer md5BS) Nothing) `onPoisonRethrow` (poison (writer md5BS) >> poison (reader md5)), writeToFile (reader fileMD5) (reader md5BS)]
Correct pipeline (testing EnrollingT): < _b4, File MD5."test1.md5", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, _b4, MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", Data.Just "Test1\n", Data.Nothing, MD5 ByteString.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File MD5."test2.md5", MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", MD5 ByteString.Nothing > % cat test1.md5 fa029a7f2a3ca5a03fe682d3b77c7f0d%
pipeline3 :: CHP () pipeline3 = enrolling $ do file <- oneToManyChannel' $ chanLabel "File" fileGZ <- oneToOneChannel' $ chanLabel "File GZ" data_ <- oneToManyChannel' $ chanLabel "Data" compressed <- oneToManyChannel' $ chanLabel "Data Compressed" md5 <- oneToOneChannel' $ chanLabel "MD5" md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed" fileGZ' <- Enroll (reader file) fileData <- Enroll (reader file) dataMD5 <- Enroll (reader data_) dataCompress <- Enroll (reader data_) compressedFile <- Enroll (reader compressed) compressedMD5 <- Enroll (reader compressed) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileGZ' >>= writeChannel (writer fileGZ) . (++".gz")) `onPoisonRethrow` (poison fileGZ' >> poison (writer fileGZ)), readFromFile fileData (writer data_), calculateMD5 dataMD5 (writer md5), compressCHP dataCompress (writer compressed), writeToFile (reader fileGZ) compressedFile, calculateMD5 compressedMD5 (writer md5Compressed), forever $ readChannel dataMD5 >>= liftIO . print >> readChannel compressedMD5 >>= liftIO . print]
Problems: (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, File GZ."test1.gz" >
onPoisonFinally :: CHP a -> CHP () -> CHP a onPoisonFinally m b = (m `onPoisonRethrow` b) <* b
Utility function (used for closing handles)
(<.>) :: Functor f => (b -> c) -> (a -> f b) -> a -> f c f <.> g = fmap f . g
<.> is for <$> as . to $.
instance MonadCHP m => MonadCHP (StateT s m) where liftCHP = lift . liftCHP
Missing instance for strict monad
(->|^) :: Show b => (Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ()) -> (c -> CHP ()) (->|^) p (l, q) x = do c <- oneToOneChannel' $ chanLabel l runParallel_ [p (writer c), q (reader c) x]
'Missing' helper function
data EnrollingT a where Lift :: CHP a -> EnrollingT a Enroll :: (Enrollable b z) => b z -> EnrollingT (Enrolled b z)
enrolling :: EnrollingT a -> CHP a enrolling (Lift v) = v enrolling (Enroll b) = enroll b return
instance Monad EnrollingT where (Lift m) >>= f = Lift $ m >>= enrolling . f (Enroll b) >>= f = Lift $ enroll b (enrolling . f) return = Lift . return instance MonadIO EnrollingT where liftIO = Lift . liftIO instance MonadCHP EnrollingT where liftCHP = Lift
Helper monad for enrolling (I know T should stand for transforming but then I realize problems). Thanks in advance

On Sun, 2010-01-03 at 17:34 +0100, Maciej Piechotka wrote:
I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file.
I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock:
If I add:
pipeline4 = do file <- oneToOneChannel' $ chanLabel "File" data_ <- oneToOneChannel' $ chanLabel "Data" compressed <- oneToOneChannel' $ chanLabel "Compressed" runParallel_ [getFiles (writer file), readFromFile (reader file) (writer data_), compressCHP (reader data_) (writer compressed), CHP.consume (reader compressed)]
And change compress to(I'm not tested without change but here I omit explicit interleave):
stateM :: Monad m => a -> (a -> m a) -> m b stateM i f = f i >>= flip stateM f
Like forever but with state
chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a)), WriteableChannel w, Poisonable (w [a])) => r (Maybe a) -> w [a] -> CHP () chanMaybe2List in_ out = do chan <- liftIO $ newChan list <- liftIO ((Just Nothing :) <$> getChanContents chan) runParallel_ [forever (readChannel in_ >>= liftIO . writeChan chan . Just) `onPoisonRethrow` (liftIO (writeChan chan Nothing) >> poison in_), forever $ stateM list process] where process (Nothing :_) = poison out >> throwPoison process (Just Nothing:Nothing:_) = poison out >> throwPoison process (Just Nothing:xs) = let (this, that) = span isJust xs isJust = maybe False (maybe False (const True)) this' = map fromJust (map fromJust this) in writeChannel out (map fromJust $ map fromJust this) >> process that
Writes to output lazy list of all elements in input
compressCHP' :: (ReadableChannel r, Poisonable (r [BS.ByteString]), WriteableChannel w, Poisonable (w [BS.ByteString])) => r [BS.ByteString] -> w [BS.ByteString] -> CHP () compressCHP' in_ out = forever (writeChannel out . LBS.toChunks . compress . LBS.fromChunks =<< readChannel in_) `onPoisonRethrow` (poison in_ >> poison out)
Compresses the lists of chunks
toMaybeList :: (ReadableChannel r, Poisonable (r [a]), WriteableChannel w, Poisonable (w (Maybe a))) => r [a] -> w (Maybe a) -> CHP () toMaybeList in_ out = forever (readChannel in_ >>= mapM_ (writeChannel out . Just) >> writeChannel out Nothing) `onPoisonRethrow` (poison in_ >> poison out)
Converts back to list
compressCHP :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r (Maybe BS.ByteString) -> w (Maybe BS.ByteString) -> CHP () compressCHP = chanMaybe2List |->| compressCHP' |->| toMaybeList
Combines all 3 operations. However pipeline3 still results in deadlock: Just "Test1\n" (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, _b4, File GZ."test1.gz", _c5, _b6, _c7, _b3 > Well - at least I know where there is no problem. Regardss

Hi, Sorry for the slightly delayed reply -- I didn't have time to look through all your code and understand it until just now. Your code has one (no doubt frustratingly!) small problem, which is in the deadlocking pipeline3: Maciej Piechotka wrote:
pipeline3 :: CHP () pipeline3 = enrolling $ do file <- oneToManyChannel' $ chanLabel "File" fileGZ <- oneToOneChannel' $ chanLabel "File GZ" data_ <- oneToManyChannel' $ chanLabel "Data" compressed <- oneToManyChannel' $ chanLabel "Data Compressed" md5 <- oneToOneChannel' $ chanLabel "MD5" md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed" fileGZ' <- Enroll (reader file) fileData <- Enroll (reader file) dataMD5 <- Enroll (reader data_) dataCompress <- Enroll (reader data_) compressedFile <- Enroll (reader compressed) compressedMD5 <- Enroll (reader compressed) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileGZ' >>= writeChannel (writer fileGZ) . (++".gz")) `onPoisonRethrow` (poison fileGZ' >> poison (writer fileGZ)), readFromFile fileData (writer data_), calculateMD5 dataMD5 (writer md5), compressCHP dataCompress (writer compressed), writeToFile (reader fileGZ) compressedFile, calculateMD5 compressedMD5 (writer md5Compressed), forever $ readChannel dataMD5 >>= liftIO . print >> readChannel compressedMD5 >>= liftIO . print]
Problems:
(CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, File GZ."test1.gz" >
Where you have "readChannel dataMD5" and "readChannel compressedMD5" in the last few lines, you actually meant to have "readChannel (reader md5)" and "readChannel (reader md5Compressed)". Your mistake meant that the former two channels were being used more times in parallel than you had enrolled and that the latter two channels were being written to but not read from. Either of these mistakes could cause deadlock, so hence why you were getting a strange deadlock. Unfortunately, the type system didn't save you this time, because the channel types happened to be the same. It took me a while to find it, too! On a side note, it would be good to have a static check for these mistakes (using a channel in parallel unsafely, and only using one end of a channel), but the only way I found to use Haskell's type-system for this is a rather nasty type-indexed monad. I guess if you use newChannelRW and name both the results, you would get an unused variable warning if you didn't use either end of the channel. This would fix one issue, but not the other. Hope that helps, Neil.
participants (2)
-
Maciej Piechotka
-
Neil Brown