Hi Mark,
your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.)
The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message.
On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them.
Thomas
{-# LANGUAGE CPP, DeriveDataTypeable #-}
module Control.Concurrent.STM.TBBroadcast(
#ifdef __GLASGOW_HASKELL__
TSender, TReceiver,
newSender, newSenderIO, writeBC,
newReceiver, readBC
#endif
) where
#ifdef __GLASGOW_HASKELL__
import GHC.Conc
import Data.Typeable (Typeable)
data TSender a = Sender {-# UNPACK #-} !(TVar Int)
{-# UNPACK #-} !(TVar (TVarList a))
{-# UNPACK #-} !(TVar (TVarList a))
deriving (Eq, Typeable)
type TVarList a = TVar (TList a)
data TList a = TNil
| TCons a {-# UNPACK #-} !(TVarList a)
| Outdated {-# UNPACK #-} !(TVar (TVarList a))
newSender :: Int -> STM (TSender a)
newSender n | n <= 0 = error "windows size must be >=0"
| otherwise = do
hole <- newTVar TNil
first <- newTVar hole
end <- newTVar hole
count <- newTVar n
return (Sender count first end)
newSenderIO :: Int -> IO (TSender a)
newSenderIO n | n <= 0 = error "windows size must be >=0"
| otherwise = do
hole <- newTVarIO TNil
first <- newTVarIO hole
end <- newTVarIO hole
count <- newTVarIO n
return (Sender count first end)
writeBC :: TSender a -> a -> STM ()
writeBC (Sender count first end) a = do
listend <- readTVar end
new_listend <- newTVar TNil
writeTVar listend (TCons a new_listend)
writeTVar end new_listend
n <- readTVar count
case n of
0 -> do
listhead <- readTVar first
head <- readTVar listhead
case head of
TCons _ tl -> writeTVar first tl
writeTVar listhead (Outdated first)
_ -> writeTVar count $! (n-1)
data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
newReceiver :: TSender a -> STM (TReceiver a)
newReceiver (Sender _ _ end) = do
hole <- readTVar end
first <-newTVar hole
return (Receiver first)
readBC :: TReceiver a -> STM a
readBC (Receiver first) = do
listhead <- readTVar first
head <- readTVar listhead
case head of
TNil -> retry
TCons a tl -> do
writeTVar first tl
return a
Outdated next -> do
next' <- readTVar next
writeTVar first next'
readBC (Receiver first)
#endif
Am 28.01.2016 um 20:30 schrieb Mark Fine:
We're currently using a TMChan to broadcast from a single producer
thread to many consumer threads. This works well! However, we're seeing
issues with a fast producer and/or a slow consumer, with the channel
growing unbounded. Fortunately, our producer-consumer communication is
time-sensitive and tolerant of loss: we're ok with the producer always
writing at the expense of dropping communication to a slow consumer.
A TMBChan provides a bounded channel (but no means to dupe/broadcast)
where a writer will block once the channel fills up. In our use case,
we'd like to continue writing to the channel but dropping off the end of
the channel. Clojure's core-async module has some related concepts, in
particular the notion of a sliding buffer
<https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer> that
drops the oldest elements once full. Has anyone encountered something
similar in working with channels and/or have any solutions? Thanks!
Mark
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe