
I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file. I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock:
{-# LANGUAGE GADTs #-} import Codec.Compression.GZip import Control.Applicative import Control.Concurrent.CHP import qualified Control.Concurrent.CHP.Common as CHP import Control.Concurrent.CHP.Enroll import Control.Concurrent.CHP.Utils import Control.Monad.State.Strict import Data.Digest.Pure.MD5 import Data.Maybe import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.ByteString.Lazy.Internal as LBS import System.Environment import System.IO import System.IO.Unsafe
calculateMD5 :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w MD5Digest)) => r (Maybe BS.ByteString) -> w MD5Digest -> CHP () calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext `onPoisonRethrow` (poison in_ >> poison out) where loop = liftCHP (readChannel in_) >>= calc' calc' Nothing = gets md5Finalize >>= liftCHP . writeChannel out >> put md5InitialContext calc' (Just b) = modify (flip md5Update $ LBS.fromChunks [b])
Calculate MD5 hash of input stream. Nothing indicates EOF.
unsafeInterleaveCHP :: CHP a -> CHP a unsafeInterleaveCHP = fromJust <.> liftIO <=< unsafeInterleaveIO <.> embedCHP
Helper function. It is suppose to move the execution in time - just as unsafeInterleaveIO. I belive that the main problem lives here. Especially that Maybe.fromJust: Nothing is the error.
chan2List :: (ReadableChannel r, Poisonable (r a)) => r a -> CHP [a] chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_) (chan2List in_)) `onPoisonTrap` return [])
Changes channel to lazy read list.
chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a))) => r (Maybe a) -> CHP [[a]] chanMaybe2List in_ = splitByMaybe <$> chan2List where splitByMaybe [] = [] splitByMaybe (Nothing:xs) = []:splitByMaybe xs splitByMaybe (Just v :[]) = [[v]] splitByMaybe (Just v :xs) = let (y:ys) = splitByMaybe xs in (v:y):ys
Reads lazyly from channel o list of list
compressCHP :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r (Maybe BS.ByteString) -> w (Maybe BS.ByteString) -> CHP () compressCHP in_ out = toOut >>= mapM_ sendBS where in_' :: CHP [LBS.ByteString] in_' = fmap LBS.fromChunks <$> chanMaybe2List in_ toOut :: CHP [LBS.ByteString] toOut = fmap compress <$> in_' sendBS :: LBS.ByteString -> CHP () sendBS LBS.Empty = writeChannel out Nothing sendBS (LBS.Chunk c r) = writeChannel out (Just c) >> sendBS r
Compress process
readFromFile :: (ReadableChannel r, Poisonable (r String), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) => r String -> w (Maybe BS.ByteString) -> CHP () readFromFile file data_ = forever (do path <- readChannel file hnd <- liftIO $ openFile path ReadMode let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>= writeChannel data_ . Just copy `onPoisonRethrow` liftIO (hClose hnd) writeChannel data_ Nothing liftIO $ hClose hnd) `onPoisonRethrow` (poison file >> poison data_)
Process reading from file
writeToFile :: (ReadableChannel r, Poisonable (r String), ReadableChannel r', Poisonable (r' (Maybe BS.ByteString))) => r String -> r' (Maybe BS.ByteString) -> CHP () writeToFile file data_ = forever (do path <- readChannel file hnd <- liftIO $ openFile path WriteMode let writeUntilNothing = readChannel data_ >>= writeUntilNothing' writeUntilNothing' Nothing = return () writeUntilNothing' (Just v) = liftIO (BS.hPutStr hnd v) >> writeUntilNothing writeUntilNothing `onPoisonFinally` liftIO (hClose hnd)) `onPoisonRethrow` (poison file >> poison data_)
Process writing to file
getFiles :: (WriteableChannel w, Poisonable (w String)) => w String -> CHP () getFiles out = mapM_ (writeChannel out) ["test1", "test2"] >> poison (out)
Sample files. Each contains "Test1\n"
pipeline1 :: CHP () pipeline1 = do md5sum <- oneToOneChannel' $ chanLabel "MD5" runParallel_ [(getFiles ->|^ ("File", readFromFile) ->|^ ("Data", calculateMD5)) (writer md5sum), forever $ readChannel (reader md5sum) >>= liftIO . print]
First pipeline. Output: fa029a7f2a3ca5a03fe682d3b77c7f0d fa029a7f2a3ca5a03fe682d3b77c7f0d < File."test1", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File."test2", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d >
pipeline2 :: CHP () pipeline2 = enrolling $ do file <- oneToManyChannel' $ chanLabel "File" fileMD5 <- oneToOneChannel' $ chanLabel "File MD5" data_ <- oneToOneChannel' $ chanLabel "Data" md5 <- oneToOneChannel' $ chanLabel "MD5" md5BS <- oneToOneChannel' $ chanLabel "MD5 ByteString" fileMD5' <- Enroll (reader file) fileData <- Enroll (reader file) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileMD5' >>= writeChannel (writer fileMD5) . (++".md5")) `onPoisonRethrow` (poison fileMD5' >> poison (writer fileMD5)), readFromFile fileData (writer data_), calculateMD5 (reader data_) (writer md5), (forever $ do v <- readChannel (reader md5) let v' = Just $ BS.pack $ show v writeChannel (writer md5BS) v' writeChannel (writer md5BS) Nothing) `onPoisonRethrow` (poison (writer md5BS) >> poison (reader md5)), writeToFile (reader fileMD5) (reader md5BS)]
Correct pipeline (testing EnrollingT): < _b4, File MD5."test1.md5", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, _b4, MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", Data.Just "Test1\n", Data.Nothing, MD5 ByteString.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File MD5."test2.md5", MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", MD5 ByteString.Nothing > % cat test1.md5 fa029a7f2a3ca5a03fe682d3b77c7f0d%
pipeline3 :: CHP () pipeline3 = enrolling $ do file <- oneToManyChannel' $ chanLabel "File" fileGZ <- oneToOneChannel' $ chanLabel "File GZ" data_ <- oneToManyChannel' $ chanLabel "Data" compressed <- oneToManyChannel' $ chanLabel "Data Compressed" md5 <- oneToOneChannel' $ chanLabel "MD5" md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed" fileGZ' <- Enroll (reader file) fileData <- Enroll (reader file) dataMD5 <- Enroll (reader data_) dataCompress <- Enroll (reader data_) compressedFile <- Enroll (reader compressed) compressedMD5 <- Enroll (reader compressed) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileGZ' >>= writeChannel (writer fileGZ) . (++".gz")) `onPoisonRethrow` (poison fileGZ' >> poison (writer fileGZ)), readFromFile fileData (writer data_), calculateMD5 dataMD5 (writer md5), compressCHP dataCompress (writer compressed), writeToFile (reader fileGZ) compressedFile, calculateMD5 compressedMD5 (writer md5Compressed), forever $ readChannel dataMD5 >>= liftIO . print >> readChannel compressedMD5 >>= liftIO . print]
Problems: (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, File GZ."test1.gz" >
onPoisonFinally :: CHP a -> CHP () -> CHP a onPoisonFinally m b = (m `onPoisonRethrow` b) <* b
Utility function (used for closing handles)
(<.>) :: Functor f => (b -> c) -> (a -> f b) -> a -> f c f <.> g = fmap f . g
<.> is for <$> as . to $.
instance MonadCHP m => MonadCHP (StateT s m) where liftCHP = lift . liftCHP
Missing instance for strict monad
(->|^) :: Show b => (Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ()) -> (c -> CHP ()) (->|^) p (l, q) x = do c <- oneToOneChannel' $ chanLabel l runParallel_ [p (writer c), q (reader c) x]
'Missing' helper function
data EnrollingT a where Lift :: CHP a -> EnrollingT a Enroll :: (Enrollable b z) => b z -> EnrollingT (Enrolled b z)
enrolling :: EnrollingT a -> CHP a enrolling (Lift v) = v enrolling (Enroll b) = enroll b return
instance Monad EnrollingT where (Lift m) >>= f = Lift $ m >>= enrolling . f (Enroll b) >>= f = Lift $ enroll b (enrolling . f) return = Lift . return instance MonadIO EnrollingT where liftIO = Lift . liftIO instance MonadCHP EnrollingT where liftCHP = Lift
Helper monad for enrolling (I know T should stand for transforming but then I realize problems). Thanks in advance