
Deforestation is usually defined as the elimination of an intermediate data structure between a single producer and a single consumer. Jorge Adriano showed an interesting example of one producer feeding two independent consumers. The two streams of data are mutually dependent. Furthermore, the rate of production is non-uniform: Generating an element in one stream may require generating trillion items in the other stream. If the first consumer is being evaluated, that consumer causes the evaluation of the producer until the latter yields an item. The trillion of items incidentally produced in the other stream have to be stored somewhere. In more OS-centric terms, the evaluator can't generally reduce two expressions in parallel. If it chooses one stream consumer, the other consumer is set aside. While the first stream consumer "waits" for a value, the second stream consumer is "blocked", and hence the corresponding stream must be buffered, sometimes at great expense. We will consider the problem of one producer and two consumers in the general setting. We derive a deforested version, which no longer needs to buffer any produced items. The retaining profile shows no space leaks. We also consider "parallel" writing of streams into two distinct files. Our solution is safe but the i/o is effectively interleaved. The deforested solution is indeed _derived_, via a sequence of equivalent transformations. In fact, the derived code worked on the first try. In the original example, the two consumers were writing received data into the their own files. Let us first generalize the problem. We first assume that the two mutually-recursive streams are specified by a transition function transition:: (a,b) -> (a,b) one the state of a _pre-stream_. The state of the latter is a pair of values. We obtain two streams in question by splitting the pre-stream into the fst and the snd components, and applying two different filters. The filtering part makes the rate of production non-uniform.
import IO
infixr 0 $^$ -- Can't do pattern-matching-deconstruction on state because -- pattern-matching is eager! (f,g) $^$ state = (f $ fst state, g $ snd state)
stream transition (p,q) state@(xstate,ystate) = ((filter p) . (xstate :), (filter q) . (ystate :)) $^$ (stream transition (p,q) $ transition state)
For illustration, we will try the following transition function and the filters:
trans1 (x,y) = (y+1,x) st_p x = x `mod` 1997 == 0 st_q = const True
The streams are: without filtering Main> take 9 $ fst $ stream trans1 (\x->True,\x->True) (0,0) [0,1,1,2,2,3,3,4,4] Main> take 9 $ snd $ stream trans1 (\x->True,\x->True) (0,0) [0,0,1,1,2,2,3,3,4] and with filtering: Main> take 9 $ snd $ stream trans1 (st_p,st_q) (0,0) [0,0,1,1,2,2,3,3,4] (443 reductions, 782 cells) Main> take 9 $ fst $ stream trans1 (st_p,st_q) (0,0) [0,1997,1997,3994,3994,5991,5991,7988,7988] (6262603 reductions, 9198209 cells, 42 garbage collections) Generating each item in the first stream indeed generates quite a few (thousand) items in the second stream. In our first example, the consumers will sum the corresponding streams:
consumers_sum stream n = (f,f) $^$ stream trans1 (st_p,st_q) (0,0) where f = sum . take n
More generally, we can write a consumer using a function sfoldl (stream fold). The function is similar to the regular foldl but can be applied to infinite streams (and so it needs a predicate to tell the termination).
sfoldl sf st z (x:_) | sf x z = z sfoldl sf st z (x:xs) = sfoldl sf st (st z x) xs
Our consumers_sum function can be written as
consumers_sum' stream n = (fst,fst) $^$ (f,f) $^$ stream trans1 (st_p,st_q) (0,0) where f = sfoldl sf st (0,0) sf x (_,count) = count >= n st (sum,count) val = (val+sum,count+1)
In general, the essence of producing and consuming two streams is captured by an expression (sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$ stream trans (p,q) (x,y) === { definition of stream } === (sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$ ( ((filter p) . (x :), (filter q) . (y :)) $^$ stream trans (p,q) $ trans (x,y) ) === { We know a$(b$c) === (a.b) $ c Similarly, a $^$ (b $^$ c) === ((((.),(.)) $^$ a) $^$ b) $^$ c or, in a less bizarre notation a $^$ (b $^$ c) === ((fst a . fst b),(snd a . snd b)) $^$ c } === ((sfoldl sf1 st1 z1) . (filter p) . (x :), (sfoldl sf2 st2 z2) . (filter q) . (y :)) $^$ stream trans (p,q) $ trans (x,y) where we also took advantage of the associativity of composition. We note that two folds, sfoldl and the fold implicit in filter, can be fused: sfoldlf sf st p z =def= (sfoldl sf st z) . (filter p) Then sfoldlf sf st p z (x:xs) === (sfoldl sf st z) $ (filter p (x:xs)) === { definition of filter, lifting the condition } if p x then (sfoldl sf st z (x:filter p xs)) else (sfoldl sf st z (filter p xs)) === { definition of sfoldlf } if p x then (sfoldl sf st z (x:filter p xs)) else sfoldlf sf st p z xs === { definition of sfoldl } if p x then if sf x z then z else sfoldl sf st (st z x) (filter p xs) else sfoldlf sf st p z xs === if p x then if sf x z then z else sfoldlf sf st p (st z x) xs else sfoldlf sf st p z xs We have just obtained a recursive equation for sfoldlf. Note that sfoldlf (just as sfoldl) are undefined for empty lists. We can now write our producer-consumers problem as (sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$ stream trans (p,q) (x,y) === ((sfoldlf sf1 st1 p z1) . (x :), (sfoldlf sf2 st2 q z2) . (y :)) $^$ stream trans (p,q) $ trans (x,y) We already see a notable simplification. The streams are now producing at the same rate! So we can replace a pair of two streams with one stream of pairs. But we can do better. Let's look at the equation for sfoldlf again sfoldlf sf st p z (x:xs) === if p x then if sf x z then z else sfoldlf sf st p (st z x) xs else sfoldlf sf st p z xs The equation suggests a useful generalization of sfoldl:
sfoldl' sf st z True _ = z sfoldl' sf st z done (x:xs) | sf x z = sfoldl' sf st z True xs sfoldl' sf st z done (x:xs) = sfoldl' sf st done (st z x) xs
We can now introduce sfoldlf' sf st p done z =def= (sfoldl' sf st done z) . (filter p) which obeys the equation sfoldlf' sf st p done z (x:xs) === if done then z else if p x then if sf x z then sfoldlf' sf st z True xs else sfoldlf' sf st p done (st z x) xs else sfoldlf' sf st p done z xs === { pushing conditions inside } sfoldlf' sf st p (done || (p x && sf x z)) (ss sf st p done z x) xs where ss sf st p done z x = if done then z else if p x then if sf x z then z else (st z x) else z Something remarkable has happened: we've got the left recursion! We should note that the introduction of a 'done' argument is not unlike a common trick in physics of converting a value into a functional with the help of the Dirac delta-function. We can now write sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 (p,q) trans state@(x,y) =def= (sfoldl' sf1 st1 done1 z1, sfoldl' sf2 st2 done2 z2) $^$ stream trans (p,q) (x,y) === ((sfoldl' sf1 st1 done1 z1) . (filter p) . (x :), (sfoldl' sf2 st2 done2 z2) . (filter q) . (y :)) $^$ stream trans (p,q) $ trans (x,y) === (\tail-> sfoldlf' sf1 st1 p done1 z1 (x:tail), \tail-> sfoldlf' sf2 st2 q done2 z2 (y:tail) ) $^$ stream trans (p,q) $ trans (x,y) === (\tail-> sfoldlf' sf1 st1 p (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x) tail, \tail-> sfoldlf' sf2 st2 q (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y) tail ) $^$ stream trans (p,q) $ trans (x,y) === { eta reduction } (sfoldlf' sf1 st1 p (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x), sfoldlf' sf2 st2 q (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y) ) $^$ stream trans (p,q) $ trans (x,y) === { definition of sfoldlfs } sfoldlfs sf1 st1 (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x) sf2 st2 (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y) (p,q) trans (trans state) And we have obtained the primitive recursive solution with no lists whatsoever. We have achieved the full deforestation! Now we just have to program it:
ss:: (x->z->Bool) -> (z->x->z) -> (x->Bool) -> Bool -> z -> x -> z ss sf st p done z x = if done then z else if p x then (if sf x z then z else (st z x)) else z
*> sfoldlfs sf1 st1 True z1 sf2 st2 True z2 (p,q) trans state = (z1,z2) *> sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 flt@(p,q) trans state@(x,y) = *> sfoldlfs sf1 st1 (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x) *> sf2 st2 (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y) *> flt trans (trans state) Or, factoring out common arguments
sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 flt@(p,q) trans state = aux done1 z1 done2 z2 state where aux True z1 True z2 _ = (z1,z2) aux done1 z1 done2 z2 state@(x,y) = aux (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x) (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y) (trans state)
consumers_sum'' trans n = (fst,fst) $^$ sfoldlfs sf st False (0,0) sf st False (0,0) (st_p,st_q) trans (0,0) where sf x (_,count) = count >= n st (sum,count) val = (val+sum,count+1)
Let us now compare the forested version consumers_sum' with the deforested version consumers_sum'' in performance, using GHCi: *Main> consumers_sum' stream 8 (31952,12) (0.26 secs, 11934436 bytes) *Main> consumers_sum'' trans1 8 (31952,12) (0.18 secs, 7695084 bytes) *Main> consumers_sum' stream 16 (127808,56) (0.86 secs, 32660328 bytes) *Main> consumers_sum'' trans1 16 (127808,56) (0.32 secs, 14354768 bytes) *Main> consumers_sum'' trans1 16 (127808,56) (0.32 secs, 14579336 bytes) *Main> consumers_sum' stream 32 (511232,240) (3.32 secs, 97786900 bytes) *Main> consumers_sum'' trans1 32 (511232,240) (0.69 secs, 28490268 bytes) It appears that the deforestation was worth it. The resulting code needs less memory and executes notably faster. In another test, we compile the code with optimization and profiling enabled:
main = do print $ consumers_sum' stream 32; print $ consumers_sum'' trans1 32
The result indicate that consumers_sum' is responsible for only 15.4% of the time and 13.7% of the allocations (most of which are due to the transition function). What's more exhilarating, the retainer profile of "print $ consumers_sum'' trans1 32" shows nothing. Nothing is retained. In contrast, the retainer profile of "print $ consumers_sum' stream 32;" shows large a large space leak of 12 MB of retained memory. But what about the original question by Jorge Adriano? Can we write two stream into two different files, and _safely_ interleave the i/o? Yes, we can. We need a version of sfoldlfs that shares the singe seed between two consumers:
sfoldlfs1 sf1 st1 done1 sf2 st2 done2 flt@(p,q) trans z state = aux done1 done2 z state where aux True True z _ = z aux done1 done2 z state@(x,y) = aux (done1 || (p x && sf1 x z)) (done2 || (q y && sf2 y z)) (ss sf2 st2 q done2 (ss sf1 st1 p done1 z x) y) (trans state)
write_in_two_files file1 file2 trans max = (\(h1,h2) -> hClose h1 >> hClose h2) =<< sfoldlfs1 sf (st fst) False sf (st snd) False (st_p,st_q) trans (do h1 <- openFile file1 WriteMode; h2 <- openFile file2 WriteMode; return (h1,h2)) (0,0) where sf x _ = x >= max st selector two_handles val = do handles <- two_handles hPrint (selector handles) val; return handles
*> main = write_in_two_files "/tmp/a1" "/tmp/a2" trans1 80000 The profile shows that the i/o is interleaved indeed.