
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer. A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks! Mark

Perhaps a circular buffer interface to TArray would be nice: data CircularTChan a = CircularTChan { tchanHead :: TVar Int , tchanLength :: TVar Int , tchanArray :: TArray Int (Maybe a) } On 2016-01-28 13:30, Mark Fine wrote:
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer
that drops the oldest elements once full. Has anyone encountered
something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe

I think you should be able to do this with the `pipes` and
`pipes-concurrency` libraries, in particular have a look at:
http://haddock.stackage.org/lts-5.0/pipes-concurrency-2.0.5/Pipes-Concurrent...
--
Noon
On Fri, Jan 29, 2016 at 7:30 AM, Mark Fine
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
-- Noon Silk, ن https://silky.github.io/ "Every morning when I wake up, I experience an exquisite joy — the joy of being this signature."

Control.Concurrent.MVar has an example implementation of a SkipChan, which
is pretty close to what you want:
https://hackage.haskell.org/package/base-4.8.2.0/docs/Control-Concurrent-MVa...
.
I also have a package called KickChan that implements something similar; if
a consumer gets too far behind the channel becomes stale and the consumer
will need to reconnect. This can be useful if e.g. the consumer gets an
initial state and the channel only communicates updates.
On 14:17, Thu, Jan 28, 2016 Noon Silk
I think you should be able to do this with the `pipes` and `pipes-concurrency` libraries, in particular have a look at: http://haddock.stackage.org/lts-5.0/pipes-concurrency-2.0.5/Pipes-Concurrent...
-- Noon
On Fri, Jan 29, 2016 at 7:30 AM, Mark Fine
wrote: We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
-- Noon Silk, ن
"Every morning when I wake up, I experience an exquisite joy — the joy of being this signature." _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe

Hi Mark, your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.) The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message. On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them. Thomas {-# LANGUAGE CPP, DeriveDataTypeable #-} module Control.Concurrent.STM.TBBroadcast( #ifdef __GLASGOW_HASKELL__ TSender, TReceiver, newSender, newSenderIO, writeBC, newReceiver, readBC #endif ) where #ifdef __GLASGOW_HASKELL__ import GHC.Conc import Data.Typeable (Typeable) data TSender a = Sender {-# UNPACK #-} !(TVar Int) {-# UNPACK #-} !(TVar (TVarList a)) {-# UNPACK #-} !(TVar (TVarList a)) deriving (Eq, Typeable) type TVarList a = TVar (TList a) data TList a = TNil | TCons a {-# UNPACK #-} !(TVarList a) | Outdated {-# UNPACK #-} !(TVar (TVarList a)) newSender :: Int -> STM (TSender a) newSender n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVar TNil first <- newTVar hole end <- newTVar hole count <- newTVar n return (Sender count first end) newSenderIO :: Int -> IO (TSender a) newSenderIO n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVarIO TNil first <- newTVarIO hole end <- newTVarIO hole count <- newTVarIO n return (Sender count first end) writeBC :: TSender a -> a -> STM () writeBC (Sender count first end) a = do listend <- readTVar end new_listend <- newTVar TNil writeTVar listend (TCons a new_listend) writeTVar end new_listend n <- readTVar count case n of 0 -> do listhead <- readTVar first head <- readTVar listhead case head of TCons _ tl -> writeTVar first tl writeTVar listhead (Outdated first) _ -> writeTVar count $! (n-1) data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a)) newReceiver :: TSender a -> STM (TReceiver a) newReceiver (Sender _ _ end) = do hole <- readTVar end first <-newTVar hole return (Receiver first) readBC :: TReceiver a -> STM a readBC (Receiver first) = do listhead <- readTVar first head <- readTVar listhead case head of TNil -> retry TCons a tl -> do writeTVar first tl return a Outdated next -> do next' <- readTVar next writeTVar first next' readBC (Receiver first) #endif Am 28.01.2016 um 20:30 schrieb Mark Fine:
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe

Hey Thomas, Thanks for the implementation ideas! It's worked out great for us and introduced a lot of stability in our system! Thanks again for your help! Mark On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer < horstmey@mathematik.uni-marburg.de> wrote:
Hi Mark,
your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.)
The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message.
On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them.
Thomas
{-# LANGUAGE CPP, DeriveDataTypeable #-}
module Control.Concurrent.STM.TBBroadcast( #ifdef __GLASGOW_HASKELL__ TSender, TReceiver, newSender, newSenderIO, writeBC, newReceiver, readBC #endif ) where
#ifdef __GLASGOW_HASKELL__
import GHC.Conc import Data.Typeable (Typeable)
data TSender a = Sender {-# UNPACK #-} !(TVar Int) {-# UNPACK #-} !(TVar (TVarList a)) {-# UNPACK #-} !(TVar (TVarList a)) deriving (Eq, Typeable)
type TVarList a = TVar (TList a) data TList a = TNil | TCons a {-# UNPACK #-} !(TVarList a) | Outdated {-# UNPACK #-} !(TVar (TVarList a))
newSender :: Int -> STM (TSender a) newSender n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVar TNil first <- newTVar hole end <- newTVar hole count <- newTVar n return (Sender count first end)
newSenderIO :: Int -> IO (TSender a) newSenderIO n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVarIO TNil first <- newTVarIO hole end <- newTVarIO hole count <- newTVarIO n return (Sender count first end)
writeBC :: TSender a -> a -> STM () writeBC (Sender count first end) a = do listend <- readTVar end new_listend <- newTVar TNil writeTVar listend (TCons a new_listend) writeTVar end new_listend n <- readTVar count case n of 0 -> do listhead <- readTVar first head <- readTVar listhead case head of TCons _ tl -> writeTVar first tl writeTVar listhead (Outdated first) _ -> writeTVar count $! (n-1)
data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
newReceiver :: TSender a -> STM (TReceiver a) newReceiver (Sender _ _ end) = do hole <- readTVar end first <-newTVar hole return (Receiver first)
readBC :: TReceiver a -> STM a readBC (Receiver first) = do listhead <- readTVar first head <- readTVar listhead case head of TNil -> retry TCons a tl -> do writeTVar first tl return a Outdated next -> do next' <- readTVar next writeTVar first next' readBC (Receiver first)
#endif
Am 28.01.2016 um 20:30 schrieb Mark Fine:
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
participants (5)
-
Eric O'Connor
-
John Lato
-
Mark Fine
-
Noon Silk
-
Thomas Horstmeyer