Hey Thomas,

Thanks for the implementation ideas! It's worked out great for us and introduced a lot of stability in our system! Thanks again for your help!

Mark

On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer <horstmey@mathematik.uni-marburg.de> wrote:
Hi Mark,

your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.)

The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message.

On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them.

Thomas


{-# LANGUAGE CPP, DeriveDataTypeable #-}

module Control.Concurrent.STM.TBBroadcast(
#ifdef __GLASGOW_HASKELL__
  TSender, TReceiver,
  newSender, newSenderIO, writeBC,
  newReceiver, readBC
#endif
) where

#ifdef __GLASGOW_HASKELL__

import GHC.Conc
import Data.Typeable (Typeable)


data TSender a = Sender {-# UNPACK #-} !(TVar Int)
                        {-# UNPACK #-} !(TVar (TVarList a))
                        {-# UNPACK #-} !(TVar (TVarList a))
  deriving (Eq, Typeable)


type TVarList a = TVar (TList a)
data TList a = TNil
             | TCons a {-# UNPACK #-} !(TVarList a)
             | Outdated {-# UNPACK #-} !(TVar (TVarList a))


newSender :: Int -> STM (TSender a)
newSender n | n <= 0    = error "windows size must be >=0"
            | otherwise = do
  hole <- newTVar TNil
  first <- newTVar hole
  end <- newTVar hole
  count <- newTVar n
  return (Sender count first end)

newSenderIO :: Int -> IO (TSender a)
newSenderIO n | n <= 0    = error "windows size must be >=0"
            | otherwise = do
  hole <- newTVarIO TNil
  first <- newTVarIO hole
  end <- newTVarIO hole
  count <- newTVarIO n
  return (Sender count first end)



writeBC :: TSender a -> a -> STM ()
writeBC (Sender count first end) a = do
  listend <- readTVar end
  new_listend <- newTVar TNil
  writeTVar listend (TCons a new_listend)
  writeTVar end new_listend
  n <- readTVar count
  case n of
     0 -> do
             listhead <- readTVar first
             head <- readTVar listhead
             case head of
               TCons _ tl -> writeTVar first tl
             writeTVar listhead (Outdated first)
     _ -> writeTVar count $! (n-1)


data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))

newReceiver :: TSender a -> STM (TReceiver a)
newReceiver (Sender _ _ end) = do
  hole <- readTVar end
  first <-newTVar hole
  return (Receiver first)


readBC :: TReceiver a -> STM a
readBC (Receiver first) = do
  listhead <- readTVar first
  head <- readTVar listhead
  case head of
    TNil -> retry
    TCons a tl -> do
      writeTVar first tl
      return a
    Outdated next -> do
      next' <- readTVar next
      writeTVar first next'
      readBC (Receiver first)

#endif


Am 28.01.2016 um 20:30 schrieb Mark Fine:
We're currently using a TMChan to broadcast from a single producer
thread to many consumer threads. This works well! However, we're seeing
issues with a fast producer and/or a slow consumer, with the channel
growing unbounded. Fortunately, our producer-consumer communication is
time-sensitive and tolerant of loss: we're ok with the producer always
writing at the expense of dropping communication to a slow consumer.

A TMBChan provides a bounded channel (but no means to dupe/broadcast)
where a writer will block once the channel fills up. In our use case,
we'd like to continue writing to the channel but dropping off the end of
the channel. Clojure's core-async module has some related concepts, in
particular the notion of a sliding buffer
<https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer> that
drops the oldest elements once full. Has anyone encountered something
similar in working with channels and/or have any solutions? Thanks!

Mark


_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe