
[ Deeply nested replies are starting to look similar to runListT $ runStateT $ runWriter .... ] matth@mindspring.com wrote:
On Tue, Jan 03, 2006 at 12:07:43AM +0000, Joel Reymont wrote:
On Jan 2, 2006, at 9:20 PM, Chris Kuklewicz wrote:
This makes me ponder one of the things that Joel was trying to do: efficiently pass data to a logging thread. It may be that a custom channel would be helpful for that as well.
I have not taken the time to analyze the Chameneos code but need to point out that my problem was not with efficiently passing data to the logging thread. The issue was with data accumulating in the channel and the logger thread not reading it out fast enough.
The TChan implementation is a single-linked list implemented on top of TVar's. That would seem pretty efficient to me.
It's simple and efficient but does nothing to prevent the channel from growing out of control. A slightly modified (custom) channel based on TChan, but enforcing a maximum size (blocking on insert if the channel is too full), probably would have solved the problem.
I assume that Erlang either does that or increases the priority of threads with large event queues, or both.
Thanks, Matt Harden
Given that actually controlling priorities is not an option, adding blocking like that makes sense. One can make a ring buffer instead of a singly linked list very easily. In fact, I have that code lying around (now attached). It has not been speed optimized, but I did like being able to express:
type Node a = [TMVar a]
make :: (Integral k) => k -> STM (Node a) make k = liftM cycle $ sequence $ genericReplicate k newEmptyTMVar
It has the usual operations, but you need to pass a fixed size to new/newEmpty and you also have an isFull test. It has no operations to resize the ring buffer created by "make". module ProdCons (PC,new,newEmpty, put,ProdCons.take,ProdCons.read, tryPut,tryTake,tryRead, isEmpty,isFull) where {- Fixed bounded-buffer size solution of producer/consumer problem. Acts like a FIFO TMVar, blocking when capacity is reached. So a capacity of 1 behaves like a TMVar. For arbitrary capacity just use a TChan. -} import Control.Concurrent.STM import Control.Concurrent import Control.Monad.Fix import Control.Monad import Data.List(cycle,genericReplicate) type Node a = [TMVar a] newtype PC a = PC (TVar (Node a),TVar (Node a)) newEmpty :: (Integral k) => k -> IO (PC a) newEmpty k | k <=0 = error "Need capacity > 0" | otherwise = do node <- atomically $ make k atomically $ do tv1 <- newTVar node tv2 <- newTVar node return (PC (tv1,tv2)) new :: (Integral k) => k -> a -> IO (PC a) new k v | k <=0 = error "Need capacity > 0" v | otherwise = do pc <- newEmpty k atomically $ put pc v return pc put ::PC a -> a -> STM () put (PC (tvar,_)) value = do (tmvar:next) <- readTVar tvar putTMVar tmvar value writeTVar tvar next take :: PC a -> STM a take (PC (_,tvar)) = do (tmvar:next) <- readTVar tvar value <- takeTMVar tmvar writeTVar tvar next return value read :: PC a -> STM a read (PC (_,tvar)) = do (tmvar:_) <- readTVar tvar readTMVar tmvar tryTake :: PC a -> STM (Maybe a) tryTake pc = (ProdCons.take pc >>= return.Just) `orElse` (return Nothing) tryRead :: PC a -> STM (Maybe a) tryRead pc = (ProdCons.read pc >>= return.Just) `orElse` (return Nothing) tryPut :: PC a -> a -> STM Bool tryPut pc v = (put pc v >> return True) `orElse` (return False) isEmpty :: PC a -> STM Bool isEmpty (PC (_,tvar)) = do (tmvar:_) <- readTVar tvar isEmptyTMVar tmvar isFull (PC (tvar,_)) = do (tmvar:_) <- readTVar tvar empty <- isEmptyTMVar tmvar return (not empty) -- -- -- Internal -- -- -- make :: (Integral k) => k -> STM (Node a) make k = liftM cycle $ sequence $ genericReplicate k newEmptyTMVar