From your updated description, I guess `atomicModifyIORef` (or maybe the strict version `atomicModifyIORef'` more desirable?) might work for you as well, and it's further more performant than 'TVar' based.
Btw, just come to my mind about the timestamps, if they are of high precision and come from different server nodes, you would not trust their total ordering, since small drifts are allowed/inevitable even the clocks are actively synchronized.
Dear Compl, thanks for putting so much thought into this. Regarding yoursuggestion, I'm afraid I misled you somehow or I don't understand thepurpose of your code. I don't need the entire history of each value,only the most recent one. Remark: The ValueNode looks like a ListT(like in list-t package) over the STM monad.We're developing a gateway between two data protocols. We have a Pythonimplementation, but it is not parallel enough and message parsing istoo slow. Therefore we hired MLabs to implement the source protocol inHaskell [1]. The gateway does the following. 1. Parent thread reads a config and forks several concurrent childthreads, each talking to a different server. The state of the childrenis read through TVars.2. The child thread polls data from its assigned server, decodes themessage and executes pre-defined callbacks [2], which are functions type Callback m = Value -> UTCTime -> m ().3. Each data packet can contain several data items and I want thesecallbacks to be executed in parallel, too. This is because we expectthe network IO to be the slowest part in the entire process. I use theparallel-io package for that. import Control.Monad.Readerimport Control.Monad.IO.Unlift (MonadUnliftIO(..))import Control.Concurrent.ParallelIO.Local (Pool,parallel_)type CallbackStrategy m n = forall f. Foldable f => f (m ()) -> n ()sequentialStrategy :: Applicative m => CallbackStrategy m msequentialStrategy = Data.Foldable.sequenceA_ -- ^ just sequence the callbacksconcurrentStrategy :: MonadUnliftIO m => CallbackStrategy m (ReaderT Pool m)concurrentStrategy = execConcurrently . Data.Foldable.toList where execConcurrently actionlist = ReaderT (\pool -> withRunInIO (\asIO -> parallel_ pool (map asIO actionlist)))-- ^ execute the callbacks concurrently on a pool of worker threadsI'd be quite happy with m = StateT s IO where s holds the most recenttime stamp and value for each data address. The simplest way tobroadcast state in this monad would be: broadcast :: Monoid s => TVar s -> StateT s IO () -> StateT s IO ()broadcast var (StateT f) = StateT (\s -> do s' <- f s atomically (modifyTVar ref (\s -> s <> s')))That is, we run the StateT action and send the changes to the TVarafterwards. But there are two more problems with this:(1) The concurrent callbacks all want to atomically modify the sameTVar. This is a concurrency bottleneck and effectively implements IsaacElliott's Locks. Hence it seems that giving each data address its ownvalue TVar could allow more parallelism. (2) StateT is inherently sequential, which defeats concurrentStrategy.In fact the unliftio-core package states:Note that, in order to meet the laws given below, the intuition is that a monad must have no monadic state, but may have monadic context. This essentially limits MonadUnliftIO to ReaderT and IdentityT transformers on top of IO.
Intuitively, there is no way to safely and concurrently modify the samestate. Nice example of how the type system guides you towards doing theright thing. Therefore I will go back to what Chris Smith and Bryan Richtersuggested, and use ReaderT (TVar s) individually for each value. Andyour suggestion of TMVars is also good in finer granularity, I think.In the following, m can have a MonadUnliftIO instance. unStateT :: MonadIO m => TMVar s -> Callback (StateT s m) -> Callback munStateT var cb = \value time -> do before <- (liftIO.atomically) (takeTMVar var) -- TMVar is now empty, no other thread can access it after <- execStateT (cb value time) before (liftIO.atomically) (putTMVar var after) -- TMVar now full againThanks everyone for helping!Olaf [1] https://github.com/mlabs-haskell/opc-xml-da-client[2] I suppose in FP-land we'd name these continuations, not callbacks.Not sure whether the continuation monad abstraction buys me anyadvantage here, though. On Sat, 2021-09-04 at 18:50 +0800, YueCompl wrote:Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.
And I realize this is more than you originally need, never mind if it's not so useful to you.
data ValueNode a = ValueNode
{ node'value :: a,
node'timestamp :: Timestamp,
node'next :: ValueSink a
}
type ValueSink a = TMVar (ValueNode a)
type Timestamp = Int
seekTail ::
forall a.
(a -> Timestamp -> a -> Timestamp -> a) ->
ValueSink a ->
STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
where
go ::
ValueSink a ->
Maybe (ValueNode a) ->
STM (ValueSink a, Maybe (ValueNode a))
go ref prevNode =
tryReadTMVar ref >>= \case
Nothing -> return (ref, prevNode)
Just self@(ValueNode spotVal spotTs nxt) ->
go nxt $
Just
self
{ node'value = case prevNode of
Nothing -> spotVal
Just (ValueNode prevVal prevTs _prevNxt) ->
f prevVal prevTs spotVal spotTs
}
updateValue ::
forall a m.
MonadIO m =>
(Maybe (a, Timestamp) -> m (a, Timestamp)) ->
ValueSink a ->
m (ValueSink a)
updateValue f sink = do
(tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
case tailNode of
Nothing -> do
(myVal, myTs) <- f Nothing
liftIO $
atomically $ do
nxt <- newEmptyTMVar
void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
return nxt
Just (ValueNode spotVal spotTs spotNxt) -> do
(myVal, myTs) <- f $ Just (spotVal, spotTs)
newNxt <- liftIO newEmptyTMVarIO
let newTail = ValueNode myVal myTs newNxt
putAsNewTailOrDiscard :: ValueSink a -> STM ()
putAsNewTailOrDiscard nodeRef =
putTMVar nodeRef newTail `orElse` yetOther'sTail
where
yetOther'sTail = do
(ValueNode _other'sVal other'sTs other'sNxt) <-
readTMVar nodeRef
if other'sTs >= myTs
then return ()
else putAsNewTailOrDiscard other'sNxt
liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
return spotNxt
where
justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
justLatest _prevVal _prevTs spotVal _spotTs = spotVal
-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.
On 2021-09-04, at 18:42, YueCompl <compl.yue@icloud.com> wrote:
I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:
data ValueNode a = ValueNode
{ node'value :: a,
node'timestamp :: Timestamp,
node'next :: ValueSink a
}
type ValueSink a = TMVar (ValueNode a)
type Timestamp = Int
seekTail ::
forall a.
(a -> Timestamp -> a -> Timestamp -> a) ->
ValueSink a ->
STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
where
go ::
ValueSink a ->
Maybe (ValueNode a) ->
STM (ValueSink a, Maybe (ValueNode a))
go ref prevNode =
tryReadTMVar ref >>= \case
Nothing -> return (ref, prevNode)
Just self@(ValueNode spotVal spotTs nxt) ->
go nxt $
Just
self
{ node'value = case prevNode of
Nothing -> spotVal
Just (ValueNode prevVal prevTs _prevNxt) ->
f prevVal prevTs spotVal spotTs
}
updateValue ::
forall a m.
MonadIO m =>
(Maybe (a, Timestamp) -> m (a, Timestamp)) ->
ValueSink a ->
m ()
updateValue f sink = do
(tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
case tailNode of
Nothing -> do
(myVal, myTs) <- f Nothing
liftIO $
atomically $ do
nxt <- newEmptyTMVar
void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
Just (ValueNode spotVal spotTs spotNxt) -> do
(myVal, myTs) <- f $ Just (spotVal, spotTs)
newNxt <- liftIO newEmptyTMVarIO
let newTail = ValueNode myVal myTs newNxt
putAsNewTailOrDiscard :: ValueSink a -> STM ()
putAsNewTailOrDiscard nodeRef =
putTMVar nodeRef newTail `orElse` yetOther'sTail
where
yetOther'sTail = do
(ValueNode _other'sVal other'sTs other'sNxt) <-
readTMVar nodeRef
if other'sTs >= myTs
then return ()
else putAsNewTailOrDiscard other'sNxt
liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
where
justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
justLatest _prevVal _prevTs spotVal _spotTs = spotVal
-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.
On 2021-09-03, at 16:26, YueCompl <compl.yue@icloud.com <mailto:compl.yue@icloud.com>> wrote:
It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
"server, tell me if there is a value of x newer than t."
and do further mutate-or-giveup, like this:
data ValueNode a = ValueNode
{ node'value :: a,
node'timestamp :: Timestamp,
node'next :: ValueSink a
}
type ValueSink a = TMVar (ValueNode a)
type Timestamp = Int
seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
seekTail sink = go sink Nothing
where
go ref ancestor =
tryReadTMVar ref >>= \case
Nothing -> return (ref, ancestor)
Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
updateValue ::
forall a m.
MonadIO m =>
(Maybe (a, Timestamp) -> m (a, Timestamp)) ->
ValueSink a ->
m ()
updateValue f sink = do
(tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
case tailNode of
Nothing -> do
(myVal, myTs) <- f Nothing
liftIO $
atomically $ do
nxt <- newEmptyTMVar
void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
Just (ValueNode seenVal seenTs seenNxt) -> do
(myVal, myTs) <- f $ Just (seenVal, seenTs)
newNxt <- liftIO newEmptyTMVarIO
let newTail = ValueNode myVal myTs newNxt
putAsNewTailOrDiscard :: ValueSink a -> STM ()
putAsNewTailOrDiscard nodeRef =
putTMVar nodeRef newTail `orElse` yetOther'sTail
where
yetOther'sTail = do
(ValueNode _other'sVal other'sTs other'sNxt) <-
readTMVar nodeRef
if other'sTs >= myTs
then return ()
else putAsNewTailOrDiscard other'sNxt
liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.
On 2021-09-03, at 01:42, Olaf Klinke <olf@aatal-apotheke.de <mailto:olf@aatal-apotheke.de>> wrote:
On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
To be concrete, my state is a collection of time stamped values, where
the monoid operation overwrites old values with new ones.
But I need to know the current state (x,t) to determine the "mutation",
because I'll be asking questions like "server, tell me if there is a
value of x newer than t."
Any observer whose initial state is synchronized with the worker thread
can in principle re-construct the worker's internal state by observing
the stream of emitted "mutations".
The most general abstraction would be that of a monoid action on a
type, but in my case the monoid (mutations) and the mutated type are
identical.
act :: m -> a -> a
act memtpy = id
act (x <> y) = act x . act y -- monoid homomorphism
act (x <> x) = act x -- idempotent
Olaf