ArrowLoop and streamprocessors

Hi all, I'm playing around a bit with arrows (more specifically, something like a CPS style streamprocessor as described in "Generalising Monads to Arrows" by John Hughes). A part of my program takes inputs/signals from 2 sources. The sources don't produce output at the same rate, and this part is able to handle inputs independently. So I figured I need Either A B ~> O for this part. handleA :: A ~> O handleB :: B ~> O someBox :: Either A B ~> O someBox = handleA ||| handleB So far so good. Now, further downstream (someBox >>> otherBox) there is. otherBox :: O ~> Either C B Let's say C are "normal" outputs, and B are control signals that need to get wired back to someBox. Control signals are rare, so maybe there's 1000 C outputs and only 1 B output in a certain timeframe. Also note that in this CPS style streamprocessing, there's no 1-on-1 relation between input and output, so on 1 input (O), otherBox might produce 2 outputs (2 times C), or 4 outputs (3 times C and 1 time B). To "wire back" B's to someBox, I'm pretty sure I need to use ArrowLoop. But (loop :: a (b, d) (c, d) -> a b c) uses tuples, which means the processing will only continue when both inputs are available. So if I turn someBox into (A, B) ~> O and otherBox into O ~> (C, B), the processing will instantly halt, waiting for a B to arrive after the A comes in. I know about the 'delay' trick that usually works for loops, where an output value is just inserted before the actual outputs, but that won't help in my case, because otherBox doesn't produce B's at the same rate that someBox receives A's, so by inserting a dummy B, someBox will only "run" once. Also, there's no relation between the number of A inputs someBox receives and the number of inputs for otherBox, so I also can't have otherBox just insert "noop" signals after every "run". So loop really doesn't seem to help here, but I couldn't find another way either to feed outputs back into the system. What I need is: Either A B ~> Either C B -> A ~> C Does such a thing exist? Thanks, Mathijs PS: I remember reading about (unfortunately didn't get the examples working) Reactive (FRP), which has the concept of Events and Behaviors I think Yampa has something like it too. A Behavior has a value at all times, which was updated by Events. That sounds a bit like it might solve my problem, since I need something that contains "Latest Control Signal was X" that updates when B's are produced, but that doesn't have to be put "on the wire" as a stream of events (because I can't determine how many events to put to match incoming A's. However, I hope there is another solution because I found Reactive and Yampa quite hard to grasp.

On 11-03-30 05:29 PM, Mathijs Kwik wrote:
Hi all,
I'm playing around a bit with arrows (more specifically, something like a CPS style streamprocessor as described in "Generalising Monads to Arrows" by John Hughes).
I had struggled with the same problem a year ago, and I concluded it was hopeless. See http://www.haskell.org/pipermail/haskell-cafe/2010-January/072193.html The only stream processors that conform to the Arrow interface seem to be the FRP-like ones, where each stream item comes with a time stamp. I'm not entirely convinced that Arrow is the best abstraction even in that case.

On Mar 30, 2011, at 5:29 PM, Mathijs Kwik wrote:
So loop really doesn't seem to help here, but I couldn't find another way either to feed outputs back into the system. What I need is: Either A B ~> Either C B -> A ~> C
Does such a thing exist?
Based on your description, it sounds to me like you want a loop such that if a B is generated at the end, you send it back to the start and execute your arrow a second time. Is that in fact what you intended? However, the docs for ArrowLoop's loop function make it clear that it executes the arrow only once: http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.3.1.0/Control-A... :loop If your arrows are instances of ArrowApply, I believe you could use app to implement the combinator you want. Cheers, -Matt

On Thu, Mar 31, 2011 at 11:01 AM, Matthew Steele
On Mar 30, 2011, at 5:29 PM, Mathijs Kwik wrote:
So loop really doesn't seem to help here, but I couldn't find another way either to feed outputs back into the system. What I need is: Either A B ~> Either C B -> A ~> C
Does such a thing exist?
Based on your description, it sounds to me like you want a loop such that if a B is generated at the end, you send it back to the start and execute your arrow a second time. Is that in fact what you intended? However, the docs for ArrowLoop's loop function make it clear that it executes the arrow only once:
http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.3.1.0/Control-A...
If your arrows are instances of ArrowApply, I believe you could use app to implement the combinator you want.
If your arrow's combinators are lazy enough, you can do something like this:
foo :: ArrowChoice (~>) => (Either a b ~> Either c b) -> (a ~> c)
foo f = (id ||| bar) . f . arr Left
where
bar = (id ||| bar) . f . arr Right
But maybe Arrow isn't the proper abstraction. Perhaps something more
similar to Fudgets is appropriate?
--
Dave Menendez

Thank you all, As you all pointed out, arrows are just not up to this without resorting to tricks such as timestamping. I'm gonna have a look at the Peakachu library now ( http://hackage.haskell.org/packages/archive/peakachu/0.3.0/doc/html/FRP-Peak... ) (thanks Gergely!), it looks like a nice API and seems to match some ideas I had in mind. Have a nice weekend! Mathijs

Forgot to CC the list, please see below.
On Wed, Mar 30, 2011 at 2:29 PM, Mathijs Kwik
someBox :: Either A B ~> O someBox = handleA ||| handleB
Not sure about this. If you are modeling the input as Either A B, then you are excluding the possibility of both A and B occur at the same time. I suggest you change the type to: someBox :: (Maybe A, Maybe B) ~> O Based on your later comments, you implied that there could be multiple B produced from one O. Then I'd suggest the following type: someBox :: (Maybe A, [B]) ~> O
otherBox :: O ~> Either C B
Also note that in this CPS style streamprocessing, there's no 1-on-1 relation between input and output, so on 1 input (O), otherBox might produce 2 outputs (2 times C), or 4 outputs (3 times C and 1 time B).
If the number of inputs do not match the number of outputs, I suggest you change the type to: otherBox :: O ~> [Either C B]
To "wire back" B's to someBox, I'm pretty sure I need to use ArrowLoop. But (loop :: a (b, d) (c, d) -> a b c) uses tuples, which means the processing will only continue when both inputs are available. So if I turn someBox into (A, B) ~> O and otherBox into O ~> (C, B), the processing will instantly halt, waiting for a B to arrive after the A comes in.
You can do something like this, first, split the B out of the ouput: split :: [Either C B] ~> ([C], [B]) Then the loop back: loop (someBox >>> otherBox >>> split) :: Maybe A ~> [C] -- Regards, Paul Liu

On Fri, Apr 1, 2011 at 8:28 PM, Paul L
Forgot to CC the list, please see below.
On Wed, Mar 30, 2011 at 2:29 PM, Mathijs Kwik
wrote: someBox :: Either A B ~> O someBox = handleA ||| handleB
Not sure about this. If you are modeling the input as Either A B, then you are excluding the possibility of both A and B occur at the same time. I suggest you change the type to:
someBox :: (Maybe A, Maybe B) ~> O
In case they both occur, I just prioritize 1 over the other, so I handle A, and on the next run handle B. Your suggestion might come useful later on though, if prioritizing won't do the trick in certain cases.
Based on your later comments, you implied that there could be multiple B produced from one O. Then I'd suggest the following type:
someBox :: (Maybe A, [B]) ~> O
This implies that otherBox buffers its results somehow to produce the list of B's and present it as 1 result. I think this defies the CPS style stream processors goal. In reality, the outputs might be infinite, or just very very many, which will cause space leaks if they need to be buffered.
otherBox :: O ~> Either C B
Also note that in this CPS style streamprocessing, there's no 1-on-1 relation between input and output, so on 1 input (O), otherBox might produce 2 outputs (2 times C), or 4 outputs (3 times C and 1 time B).
If the number of inputs do not match the number of outputs, I suggest you change the type to:
otherBox :: O ~> [Either C B]
I think you are suggesting to use lists to essentially synchronize the input/output streams so every "step" of every part of the program will always produce an output for every input. In case no "real" output was produced, the result is just []. This is a very interesting thought but I think there will be issues with this. The "start" (someBox) will wait before every "run" until it receives the result from the "end", even if this is just []. If the loop becomes larger, includes heavy calculations, or has to wait for IO, this might take quite some time. Essentially the loop as a whole will be running as some kind of singleton. It will only restart if the last "run" has fully completed, meaning all inbetween steps are doing no processing in the mean time. Also, the full loop (every step within it) will need to run for every input. If someBox filters its inputs and only acts on 1 in a million of its inputs, it will now have to send [] downstream for every dropped input and wait for the end of the loop to feed-back [] to continue.
To "wire back" B's to someBox, I'm pretty sure I need to use ArrowLoop. But (loop :: a (b, d) (c, d) -> a b c) uses tuples, which means the processing will only continue when both inputs are available. So if I turn someBox into (A, B) ~> O and otherBox into O ~> (C, B), the processing will instantly halt, waiting for a B to arrive after the A comes in.
You can do something like this, first, split the B out of the ouput:
split :: [Either C B] ~> ([C], [B])
Then the loop back:
loop (someBox >>> otherBox >>> split) :: Maybe A ~> [C]
I must say, you did solve the problem I posted and I am gonna have a look at its implications this weekend. It's probably not gonna work in all situations and in a way defeats stream processing's advantages, but it might still be useful in certain situations. Thanks you for your advice, it at least got me thinking in directions I didn't consider before. Mathijs
-- Regards, Paul Liu
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On Fri, Apr 1, 2011 at 1:09 PM, Mathijs Kwik
I think this defies the CPS style stream processors goal. In reality, the outputs might be infinite, or just very very many, which will cause space leaks if they need to be buffered.
If the input (and in your case, the output fedback as input) cannot be promptly consumed, it's a space/time leak in any real-time system. In any case, it is an implementation issue (which hopefully has some solution), not a conceptual one.
The "start" (someBox) will wait before every "run" until it receives the result from the "end", even if this is just [].
In order for it not to deadlock, you need some delay/init primitive so that the output is initialized *before* the input is.
If the loop becomes larger, includes heavy calculations, or has to wait for IO, this might take quite some time. Essentially the loop as a whole will be running as some kind of singleton. It will only restart if the last "run" has fully completed, meaning all inbetween steps are doing no processing in the mean time.
Under the usual arrow abstraction, stream input is processed synchronously (next one is not consumed until previous one is). But it doesn't mean that you cannot return a value before it is fully computed. For pure computations, you can use "par" to evaluate in parallel and return the value from an arrow before it is fully available. For IO computations, you may use the hGetContents or getChanContents trick to return from IO without waiting. This way, subsequent arrow computations that do not depend on the full availability of their inputs may proceed promptly. Otherwise, you have to wait for the computation or IO to complete *anyway*, which is more of a data dependency issue that is orthogonal to the abstract model you choose to program with.
Also, the full loop (every step within it) will need to run for every input. If someBox filters its inputs and only acts on 1 in a million of its inputs, it will now have to send [] downstream for every dropped input and wait for the end of the loop to feed-back [] to continue.
It depends on how your program is logically structured. You can filter out things by skipping otherBox entirely using ArrowChoice, which means otherBox is not run at all when there is no input for it. On the other hand, if you want otherBox to run constantly (e.g., when you do IO or update internal state), then I don't see the issue of passing [] to and getting [] from it (which supposely symbolizes no input is consumed and no output is produced, and hence no wait for heavy computation or IO).
I must say, you did solve the problem I posted and I am gonna have a look at its implications this weekend. It's probably not gonna work in all situations and in a way defeats stream processing's advantages, but it might still be useful in certain situations.
I'd love to hear back from you, perhaps on a more concrete case where things do or do not work out as you intended. -- Regards, Paul Liu
participants (5)
-
David Menendez
-
Mario Blažević
-
Mathijs Kwik
-
Matthew Steele
-
Paul L