Patterns for processing large but finite streams

Hi, I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises: How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values? Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers Is this, perhaps, what comonads are for? Or iteratees? -- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

At Fri, 1 Jul 2011 09:39:32 +0400, Eugene Kirpichov wrote:
Hi,
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values?
Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers
Is this, perhaps, what comonads are for? Or iteratees?
Sounds like a good job for iteratees. Summing a stream of numbers is just an Iteratee. Transcoding a stream into another stream is a job for an Inum (Iteratee-enumerator) or enumeratee, depending on which package's nomenclature you use. You have three implementations to choose from: - http://hackage.haskell.org/package/iteratee (original) - http://hackage.haskell.org/package/enumerator (John Milikin's re-write) - http://hackage.haskell.org/package/iterIO (my 3rd-generation attempt) David

Eugene Kirpichov wrote:
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values?
Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers
Is this, perhaps, what comonads are for? Or iteratees?
Plain old lazy lists? Best regards, Heinrich Apfelmus -- http://apfelmus.nfshost.com

Plain old lazy lists do not allow me to combine multiple concurrent
computations, e.g. I cannot define average from sum and length.
2011/7/1 Heinrich Apfelmus
Eugene Kirpichov wrote:
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values?
Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers
Is this, perhaps, what comonads are for? Or iteratees?
Plain old lazy lists?
Best regards, Heinrich Apfelmus
-- http://apfelmus.nfshost.com
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

Sure you can. runningAverage :: Int -> [Double] -> [Double] runningAverage n xs = let chunk = take n xs in (sum chunk / length chunk) : runningAverage (tail xs) Lazy lists are absolutely ideal for this purpose. Regards, Malcolm On 1 Jul 2011, at 07:33, Eugene Kirpichov wrote:
Plain old lazy lists do not allow me to combine multiple concurrent computations, e.g. I cannot define average from sum and length.
2011/7/1 Heinrich Apfelmus
: Eugene Kirpichov wrote:
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values?
Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers
Is this, perhaps, what comonads are for? Or iteratees?
Plain old lazy lists?
Best regards, Heinrich Apfelmus
-- http://apfelmus.nfshost.com
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

I meant the average of the whole list - given a sumS and lengthS ("S"
for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
(looks like arrows actually - which arrow is appropriate here?)
2011/7/1 Malcolm Wallace
Sure you can.
runningAverage :: Int -> [Double] -> [Double] runningAverage n xs = let chunk = take n xs in (sum chunk / length chunk) : runningAverage (tail xs)
Lazy lists are absolutely ideal for this purpose. Regards, Malcolm
On 1 Jul 2011, at 07:33, Eugene Kirpichov wrote:
Plain old lazy lists do not allow me to combine multiple concurrent computations, e.g. I cannot define average from sum and length.
2011/7/1 Heinrich Apfelmus
: Eugene Kirpichov wrote:
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
How to represent large but finite streams and functions that process them, returning other streams or some kinds of aggregate values?
Examples: * Adjacent differences of a stream of numbers * Given a stream of numbers with times, split it into buckets by time of given width and produce a stream of (bucket, 50%,75% and 90% quantiles in this bucket) * Sum a stream of numbers
Is this, perhaps, what comonads are for? Or iteratees?
Plain old lazy lists?
Best regards, Heinrich Apfelmus
-- http://apfelmus.nfshost.com
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

On Fri, Jul 1, 2011 at 12:21 PM, Eugene Kirpichov
I meant the average of the whole list - given a sumS and lengthS ("S" for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
Sure you can. Sum, length and mean could be calculated as left fold. If you need to calculate more that one statistic at time you can combine accumulators
sum = foldl (+) 0 length = foldl (\n _ -> n+1) 0 data Mean Double Int mean = foldl (\(Mean m n) x -> Mean (m + (x - m) / fromIntegral (n+1)) (n+1)) (Mean 0 0)
AFAIU iteratees basically use same technique.

Alexey, your definition of "mean" does not look like "liftS2 (/) sum
length" - you have to manually "fuse" these computations.
I'm asking for a formalism that does this fusion automatically (and
guaranteedly).
2011/7/1 Alexey Khudyakov
On Fri, Jul 1, 2011 at 12:21 PM, Eugene Kirpichov
wrote: I meant the average of the whole list - given a sumS and lengthS ("S" for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
Sure you can. Sum, length and mean could be calculated as left fold. If you need to calculate more that one statistic at time you can combine accumulators
sum = foldl (+) 0 length = foldl (\n _ -> n+1) 0 data Mean Double Int mean = foldl (\(Mean m n) x -> Mean (m + (x - m) / fromIntegral (n+1)) (n+1)) (Mean 0 0)
AFAIU iteratees basically use same technique.
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

On Fri, Jul 1, 2011 at 12:54 PM, Eugene Kirpichov
Alexey, your definition of "mean" does not look like "liftS2 (/) sum length" - you have to manually "fuse" these computations.
Well it was fused for numerical stability
I'm asking for a formalism that does this fusion automatically (and guaranteedly).
Joining accumulators is quite straightforward. So is joining of initial state. Just creating a
joinAcc :: (acc1 -> x -> acc1) -> (acc2 -> x -> acc2) -> (acc1,acc2) -> x -> (acc1,acc2) joinAcc f1 f2 (s1,s2) x = (f1 s1 x, f2 s2 x)
Still you have to handle them separately.
sum' = foldl (+) 0 len = foldl (\n _ -> n+1) 0 sumLen = foldl (joinAcc (+) (\n _ -> n+1)) (0,0)
There is more regular approach but it only works with statistics. (function which do not depend on order of elements in the sample) For every statistics monoid for its evaluation could be constructed. For example sum:
newtype Sum a = Sum a instance Num a => Monoid (Sum a) where mempty = Sum 0 mappend (Sum a) (Sum b) = Sum (a+b)
Composition of these monoids becomes trivial. Just use I pursued this approach in monoid-statistics[1] package. It's reasonably well documented [1] http://hackage.haskell.org/package/monoid-statistics

Thanks but I'm afraid that's still not quite what I'm looking for;
guess I'll have to define my desire by my implementation - so once
it's ready I'll show the result to cafe :)
2011/7/1 Alexey Khudyakov
On Fri, Jul 1, 2011 at 12:54 PM, Eugene Kirpichov
wrote: Alexey, your definition of "mean" does not look like "liftS2 (/) sum length" - you have to manually "fuse" these computations.
Well it was fused for numerical stability
I'm asking for a formalism that does this fusion automatically (and guaranteedly).
Joining accumulators is quite straightforward. So is joining of initial state. Just creating a
joinAcc :: (acc1 -> x -> acc1) -> (acc2 -> x -> acc2) -> (acc1,acc2) -> x -> (acc1,acc2) joinAcc f1 f2 (s1,s2) x = (f1 s1 x, f2 s2 x)
Still you have to handle them separately.
sum' = foldl (+) 0 len = foldl (\n _ -> n+1) 0 sumLen = foldl (joinAcc (+) (\n _ -> n+1)) (0,0)
There is more regular approach but it only works with statistics. (function which do not depend on order of elements in the sample) For every statistics monoid for its evaluation could be constructed. For example sum:
newtype Sum a = Sum a instance Num a => Monoid (Sum a) where mempty = Sum 0 mappend (Sum a) (Sum b) = Sum (a+b)
Composition of these monoids becomes trivial. Just use
I pursued this approach in monoid-statistics[1] package. It's reasonably well documented
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

This sound exactly like what attribute grammars, like the system
developed at Utrecht University [1], are useful for.
Erik
[1] http://www.cs.uu.nl/wiki/HUT/AttributeGrammarSystem
On Fri, Jul 1, 2011 at 10:54, Eugene Kirpichov
Alexey, your definition of "mean" does not look like "liftS2 (/) sum length" - you have to manually "fuse" these computations.
I'm asking for a formalism that does this fusion automatically (and guaranteedly).
2011/7/1 Alexey Khudyakov
: On Fri, Jul 1, 2011 at 12:21 PM, Eugene Kirpichov
wrote: I meant the average of the whole list - given a sumS and lengthS ("S" for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
Sure you can. Sum, length and mean could be calculated as left fold. If you need to calculate more that one statistic at time you can combine accumulators
sum = foldl (+) 0 length = foldl (\n _ -> n+1) 0 data Mean Double Int mean = foldl (\(Mean m n) x -> Mean (m + (x - m) / fromIntegral (n+1)) (n+1)) (Mean 0 0)
AFAIU iteratees basically use same technique.
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Eugene Kirpichov wrote:
Plain old lazy lists do not allow me to combine multiple concurrent computations, e.g. I cannot define average from sum and length.
I meant the average of the whole list - given a sumS and lengthS ("S" for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
(looks like arrows actually - which arrow is appropriate here?)
That's a very good point. Just to clarify for everyone: Eugene wants to write the function average almost *literally* as average xs = sum xs / length xs but he wants the functions sum and length to fuse, so that the input stream xs is *not* shared as a whole. I have thought about this problem for a while actually and have observed the following: 1) You are not looking for a representation of streams, but for a representation of *functions* on streams. The essence of a function on streams is its case analysis of the input. Hence, the simplest solution is to make the case analysis explicit: data StringTo a = CaseOf a (Char -> StringTo a) -- function on a stream (here: String) interpret :: StringTo a -> (String -> a) interpret (CaseOf nil cons) [] = nil interpret (CaseOf nil cons) (x:xs) = interpret (cons x) xs instance Applicative StringTo where pure a = CaseOf a (const $ pure a) (CaseOf nil1 cons1) <*> (CaseOf nil2 cons2) = CaseOf (nil1 $ nil2) (\c -> cons1 c <*> cons2 c) length = go 0 where go n = CaseOf n (\_ -> go $! n+1) average = liftA2 (/) sum length In other words, if you reify case .. of expression , you will be able to fuse them. 2) If Haskell were to support some kind of evaluation under the lambda (partial evaluation, head normal form instead of weak head normal form), it would be unnecessary to make the case expressions implicit. Rather, the applicative instance could be written as follows instance Applicative ((->) String) where pure a = const a f <*> x = \cs -> case cs of [] -> f [] $ x [] (c:cs) -> let f' cs = f (c:cs) -- partial evaluation on this x' cs = x (c:cs) in f' `partialseq` x' `partialseq` (f' <*> x') cs We could simply write average = liftA2 (/) sum length and everything would magically fuse. 3) John Hughes has already thought about this problem in his PhD thesis. :) (but it is not available for download on the internet, unfortunately. :( ). His solution was a SYNCHLIST primitive in conjunction with some sort of parallelism PAR. Basically, the SYNCHLIST primitive only allows simultaneous access to the input stream and the parallelism is used to make that simultaneity happen. Best regards, Heinrich Apfelmus -- http://apfelmus.nfshost.com

Hi,
You're right, reifying stream processing functions seems indeed the
way to go - and that looks even more like arrows :)
I thought of something like this:
data SP i o = Yield [o] (Maybe (Maybe i -> SP i o))
"Scalar" functions like sum and length are just SP's that return a
single item in the output stream.
sum :: (Num a) => SP a a
sum = sum' 0 where sum' s = Yield [] $ Just $ maybe (Yield [s]
Nothing) (sum' . (s+))
Adjacent differences would be like "liftA2 (-) input laggedInput"
laggedInput would be like:
laggedInput :: SP i i
laggedInput = li Nothing
where
li maybePrev = Yield (maybe2list maybePrev) $ Just $ maybe empty (li . Just)
Looks like this can be made into an instance of Arrow and can be composed etc.
2011/7/1 Heinrich Apfelmus
Eugene Kirpichov wrote:
Plain old lazy lists do not allow me to combine multiple concurrent computations, e.g. I cannot define average from sum and length.
I meant the average of the whole list - given a sumS and lengthS ("S" for "Stream"), write meanS as something like liftS2 (/) sumS lengthS.
Or is that possible with lazy lists too?
(looks like arrows actually - which arrow is appropriate here?)
That's a very good point. Just to clarify for everyone: Eugene wants to write the function average almost *literally* as
average xs = sum xs / length xs
but he wants the functions sum and length to fuse, so that the input stream xs is *not* shared as a whole.
I have thought about this problem for a while actually and have observed the following:
1) You are not looking for a representation of streams, but for a representation of *functions* on streams. The essence of a function on streams is its case analysis of the input. Hence, the simplest solution is to make the case analysis explicit:
data StringTo a = CaseOf a (Char -> StringTo a)
-- function on a stream (here: String) interpret :: StringTo a -> (String -> a) interpret (CaseOf nil cons) [] = nil interpret (CaseOf nil cons) (x:xs) = interpret (cons x) xs
instance Applicative StringTo where pure a = CaseOf a (const $ pure a) (CaseOf nil1 cons1) <*> (CaseOf nil2 cons2) = CaseOf (nil1 $ nil2) (\c -> cons1 c <*> cons2 c)
length = go 0 where go n = CaseOf n (\_ -> go $! n+1)
average = liftA2 (/) sum length
In other words, if you reify case .. of expression , you will be able to fuse them.
2) If Haskell were to support some kind of evaluation under the lambda (partial evaluation, head normal form instead of weak head normal form), it would be unnecessary to make the case expressions implicit. Rather, the applicative instance could be written as follows
instance Applicative ((->) String) where pure a = const a f <*> x = \cs -> case cs of [] -> f [] $ x [] (c:cs) -> let f' cs = f (c:cs) -- partial evaluation on this x' cs = x (c:cs) in f' `partialseq` x' `partialseq` (f' <*> x') cs
We could simply write
average = liftA2 (/) sum length
and everything would magically fuse.
3) John Hughes has already thought about this problem in his PhD thesis. :) (but it is not available for download on the internet, unfortunately. :( ). His solution was a SYNCHLIST primitive in conjunction with some sort of parallelism PAR. Basically, the SYNCHLIST primitive only allows simultaneous access to the input stream and the parallelism is used to make that simultaneity happen.
Best regards, Heinrich Apfelmus
-- http://apfelmus.nfshost.com
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- Eugene Kirpichov Principal Engineer, Mirantis Inc. http://www.mirantis.com/ Editor, http://fprog.ru/

Eugene Kirpichov
2011/7/1 Heinrich Apfelmus
: Eugene Kirpichov wrote:
I'm rewriting timeplot to avoid holding the whole input in memory, and naturally a problem arises:
Plain old lazy lists?
Heretic! :-) I generally have written a bunch of programs that do things that way, and I think it works pretty well with a couple of caveats: 1. Make sure you collect data into strict data structures. Dangerous operations are addition and anything involving Data.Map. And use foldl'. 2. If you plan on working on multiple files, extra care might be needed to close them, or you'll run out of file descriptors. As long as you avoid these pitfalls, the advantage is very clean and simple code.
Plain old lazy lists do not allow me to combine multiple concurrent computations, e.g. I cannot define average from sum and length.
Yes, this is clunky. I'm not aware of any good solution. -k -- If I haven't seen further, it is by standing in the footprints of giants
participants (7)
-
Alexey Khudyakov
-
dm-list-haskell-cafe@scs.stanford.edu
-
Erik Hesselink
-
Eugene Kirpichov
-
Heinrich Apfelmus
-
Ketil Malde
-
Malcolm Wallace