
Hey Thomas, Thanks for the implementation ideas! It's worked out great for us and introduced a lot of stability in our system! Thanks again for your help! Mark On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer < horstmey@mathematik.uni-marburg.de> wrote:
Hi Mark,
your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.)
The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message.
On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them.
Thomas
{-# LANGUAGE CPP, DeriveDataTypeable #-}
module Control.Concurrent.STM.TBBroadcast( #ifdef __GLASGOW_HASKELL__ TSender, TReceiver, newSender, newSenderIO, writeBC, newReceiver, readBC #endif ) where
#ifdef __GLASGOW_HASKELL__
import GHC.Conc import Data.Typeable (Typeable)
data TSender a = Sender {-# UNPACK #-} !(TVar Int) {-# UNPACK #-} !(TVar (TVarList a)) {-# UNPACK #-} !(TVar (TVarList a)) deriving (Eq, Typeable)
type TVarList a = TVar (TList a) data TList a = TNil | TCons a {-# UNPACK #-} !(TVarList a) | Outdated {-# UNPACK #-} !(TVar (TVarList a))
newSender :: Int -> STM (TSender a) newSender n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVar TNil first <- newTVar hole end <- newTVar hole count <- newTVar n return (Sender count first end)
newSenderIO :: Int -> IO (TSender a) newSenderIO n | n <= 0 = error "windows size must be >=0" | otherwise = do hole <- newTVarIO TNil first <- newTVarIO hole end <- newTVarIO hole count <- newTVarIO n return (Sender count first end)
writeBC :: TSender a -> a -> STM () writeBC (Sender count first end) a = do listend <- readTVar end new_listend <- newTVar TNil writeTVar listend (TCons a new_listend) writeTVar end new_listend n <- readTVar count case n of 0 -> do listhead <- readTVar first head <- readTVar listhead case head of TCons _ tl -> writeTVar first tl writeTVar listhead (Outdated first) _ -> writeTVar count $! (n-1)
data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
newReceiver :: TSender a -> STM (TReceiver a) newReceiver (Sender _ _ end) = do hole <- readTVar end first <-newTVar hole return (Receiver first)
readBC :: TReceiver a -> STM a readBC (Receiver first) = do listhead <- readTVar first head <- readTVar listhead case head of TNil -> retry TCons a tl -> do writeTVar first tl return a Outdated next -> do next' <- readTVar next writeTVar first next' readBC (Receiver first)
#endif
Am 28.01.2016 um 20:30 schrieb Mark Fine:
We're currently using a TMChan to broadcast from a single producer thread to many consumer threads. This works well! However, we're seeing issues with a fast producer and/or a slow consumer, with the channel growing unbounded. Fortunately, our producer-consumer communication is time-sensitive and tolerant of loss: we're ok with the producer always writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast) where a writer will block once the channel fills up. In our use case, we'd like to continue writing to the channel but dropping off the end of the channel. Clojure's core-async module has some related concepts, in particular the notion of a sliding buffer https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer that drops the oldest elements once full. Has anyone encountered something similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe