
Hello, I would like to create a fucntion whcih does mkCube :: [IO a] -> IO a mkCube = ... I want to fold all this concurrently in order to obtain the final a I have a function like this merge :: a -> a -> IO a in order to sort of merge two a into another one. I did not found something like this on hoogle. so I would like your advices in order to write mkCube Thanks Frederic

On Fri, Nov 15, 2019 at 05:44:53PM +0000, PICCA Frederic-Emmanuel wrote:
Hello, I would like to create a fucntion whcih does
mkCube :: [IO a] -> IO a mkCube = ...
I want to fold all this concurrently in order to obtain the final a
Is the list is guaranteed non-empty? Do you want to enforce that at the type level (compile time), or "fail" at runtime when it is? You should probably be a bit more explicit about what you mean by "concurrently". Do you know in advance that the list length is sufficiently short to make it reasonably to immediately fork an async thread for each? Also, do you want the outputs to folded in list order, or in any order (e.g. roughly in order of IO action completion)?
I have a function like this
merge :: a -> a -> IO a
in order to sort of merge two a into another one.
Is `merge` really an IO action, or did you mean "a -> a -> a"? Most if not all of the building blocks for this are in the async package, but putting them together correctly generally depends on the details of your use-case. -- Viktor.

Is the list is guaranteed non-empty? Do you want to enforce that at the type level (compile time), or "fail" at runtime when it is?
You should probably be a bit more explicit about what you mean by "concurrently". Do you know in advance that the list length is sufficiently short to make it reasonably to immediately fork an async thread for each?
Also, do you want the outputs to folded in list order, or in any order (e.g. roughly in order of IO action completion)?
The real probleme, is: I have a bunch of hdf5 files which contain a stack of image and other metadata's. for each image an associated metadatas, I can create a cube (3D array), whcih is the binning in the 3D space for each image of the stack read image + metadata -> transformation in 3D space -> binning. then binning -> binning -> binning (this is the monoid), since this is pure computation, I can use unsafe IO to create the merge function. In my case, I want to distribute all this on all my core. each core can do in which ever order a merge of the binning until I have only one binning. [a1, a2, a3, a4] core1: a1 + a2 -> a12 core2: a3 + a4 -> a34 then first core available, a12 + a34 -> a1234 Is it clearer ? Fred

On Fri, Nov 15, 2019 at 08:20:44PM +0000, PICCA Frederic-Emmanuel wrote:
Is the list is guaranteed non-empty? Do you want to enforce that at the type level (compile time), or "fail" at runtime when it is?
?
You should probably be a bit more explicit about what you mean by "concurrently". Do you know in advance that the list length is sufficiently short to make it reasonable to immediately fork an async thread for each?
?
Also, do you want the outputs to folded in list order, or in any order (e.g. roughly in order of IO action completion)?
The real probleme, is:
I have a bunch of hdf5 files which contain a stack of image and other metadata's.
Is that at least one? O(1) per core, ... hundreds, tens of thousands?
for each image an associated metadatas, I can create a cube (3D array), whcih is the binning in the 3D space
Do you have space constraints on the number of not yet folded together cubes that can be in memory at the same time?
binning -> binning -> binning (this is the monoid), since this is pure computation, I can use unsafe IO to create the merge function.
Is it actually a Monoid (has an identity), or only a Semigroup?
In my case, I want to distribute all this on all my core.
Which is the (more) expensive operation, computing a cube or merging two already computed cubes?
each core can do in which ever order a merge of the binning until I have only one binning.
[a1, a2, a3, a4]
core1: a1 + a2 -> a12 core2: a3 + a4 -> a34
then
first core available, a12 + a34 -> a1234
That is still ultimately order preserving (but associative): (a1 + a2) + (a3 + a4). Is the semigroup also commutative, would: (a2 + a4) + (a1 + a3) also work? -- Viktor.

Is the list is guaranteed non-empty? Do you want to enforce that at the type level (compile time), or "fail" at runtime when it is?
runtime error
You should probably be a bit more explicit about what you mean by "concurrently". Do you know in advance that the list length is sufficiently short to make it reasonable to immediately fork an async thread for each?
the list can have 100000 images of 1 million of pixels, and it is not that short per image.
Is that at least one? O(1) per core, ... hundreds, tens of thousands?
for now I have 100000 images to merge on 24 core.
Do you have space constraints on the number of not yet folded together cubes that can be in memory at the same time?
On my computer I have 256 Go of memory enought to load all the images first, but I need to target smaller computer. the biggest stack that I processed was 60Go for a final cube of 2000x2000x2000 of int32. but most of the time it will be around 1000x1000x1000 of int32
Is it actually a Monoid (has an identity), or only a Semigroup?
I can have a identiy which is an empty cube.
Which is the (more) expensive operation, computing a cube or merging two already computed cubes?
computing the cube is constant in time for a given image size. merging cube is slower and slower with time, since the cube grows from merge to merge.
That is still ultimately order preserving (but associative): (a1 + a2) + (a3 + a4). Is the semigroup also commutative, would: (a2 + a4) + (a1 + a3) also work?
yes Fred

On Fri, Nov 15, 2019 at 09:29:58PM +0000, PICCA Frederic-Emmanuel wrote:
Is the list is guaranteed non-empty? Do you want to enforce that at the type level (compile time), or "fail" at runtime when it is?
runtime error
If there's an identity (empty) cube, then an empty list can perhaps just map to that, but you could still throw an error.
You should probably be a bit more explicit about what you mean by "concurrently". Do you know in advance that the list length is sufficiently short to make it reasonable to immediately fork an async thread for each?
the list can have 100000 images of 1 million of pixels, and it is not that short per image.
This suggests that processing them should not happen all at once, but rather you'd load more images as idle task slots become available. This suggests use of 'mapReduce' from: https://hackage.haskell.org/package/async-pool-0.9.0.2/docs/Control-Concurre...
Is it actually a Monoid (has an identity), or only a Semigroup?
I can have a identiy which is an empty cube.
This is good, since mapReduce wants a Monoid.
computing the cube is constant in time for a given image size. merging cube is slower and slower with time, since the cube grows from merge to merge.
It looks like mapReduce also reduces pairs in parallel, and discards the inputs. It looks to be order preserving, so you don't even need commutativity. I've not used this module myself, please post a summary of your experience. -- Viktor.

On Fri, Nov 15, 2019 at 05:06:16PM -0500, Viktor Dukhovni wrote:
I've not used this module myself, please post a summary of your experience.
I was curious, so I decided to try a simple case: {-# LANGUAGE BlockArguments #-} {-# LANGUAGE BangPatterns #-} module Main (main) where import Control.Concurrent.Async.Pool import Control.Concurrent.STM import Control.Monad import Data.List import Data.Monoid import System.Environment defCount, batchSz :: Int defCount = 10000 batchSz = 256 batchList :: Int -> [a] -> [[a]] batchList sz as = case splitAt sz as of ([], _) -> [] (t, []) -> [t] (h, t) -> h : batchList sz t main :: IO () main = do n <- maybe defCount read <$> (fmap fst . uncons) <$> getArgs let bs = batchList batchSz $ map Sum [1..n] s <- foldM mergeReduce mempty bs print $ getSum s where mergeReduce :: Sum Int -> [(Sum Int)] -> IO (Sum Int) mergeReduce !acc ms = (acc <>) <$> reduceBatch (return <$> ms) reduceBatch :: Monoid a => [IO a] -> IO a reduceBatch ms = withTaskGroup 8 $ (>>= wait) . atomically . flip mapReduce ms Without batching, the whole list of actions is brought into memory, all at once (to create the task dependency graph), and then the outputs are folded concurrently, which does not run in constant memory in the size of the list. In the above the list of actions is chunked (256 at a time), these are merged concurrently, but then the results from the chunks are merged sequentially. If the cost of storing the entire task list in memory is negligible, a single mapReduce may perform better: {-# LANGUAGE BlockArguments #-} module Main (main) where import Control.Concurrent.Async.Pool import Control.Concurrent.STM import Data.List import Data.Monoid import System.Environment defCount :: Int defCount = 100 main :: IO () main = do n <- maybe defCount read <$> (fmap fst . uncons) <$> getArgs withTaskGroup 8 \tg -> do reduction <- atomically $ mapReduce tg $ map (return . Sum) [1..n] wait reduction >>= print . getSum -- Viktor.

thanks a lot, I will try to implement this with mapReduce and I will post my solution :) Cheers Frederic
participants (2)
-
PICCA Frederic-Emmanuel
-
Viktor Dukhovni