On 2021-09-04, at 18:42, YueCompl <compl.yue@icloud.com> wrote:I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:data ValueNode a = ValueNode{ node'value :: a,node'timestamp :: Timestamp,node'next :: ValueSink a}type ValueSink a = TMVar (ValueNode a)type Timestamp = IntseekTail ::forall a.(a -> Timestamp -> a -> Timestamp -> a) ->ValueSink a ->STM (ValueSink a, Maybe (ValueNode a))seekTail f sink = go sink Nothingwherego ::ValueSink a ->Maybe (ValueNode a) ->STM (ValueSink a, Maybe (ValueNode a))go ref prevNode =tryReadTMVar ref >>= \caseNothing -> return (ref, prevNode)Just self@(ValueNode spotVal spotTs nxt) ->go nxt $Justself{ node'value = case prevNode ofNothing -> spotValJust (ValueNode prevVal prevTs _prevNxt) ->f prevVal prevTs spotVal spotTs}updateValue ::forall a m.MonadIO m =>(Maybe (a, Timestamp) -> m (a, Timestamp)) ->ValueSink a ->m ()updateValue f sink = do(tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sinkcase tailNode ofNothing -> do(myVal, myTs) <- f NothingliftIO $atomically $ donxt <- newEmptyTMVarvoid $ tryPutTMVar tailRef $ ValueNode myVal myTs nxtJust (ValueNode spotVal spotTs spotNxt) -> do(myVal, myTs) <- f $ Just (spotVal, spotTs)newNxt <- liftIO newEmptyTMVarIOlet newTail = ValueNode myVal myTs newNxtputAsNewTailOrDiscard :: ValueSink a -> STM ()putAsNewTailOrDiscard nodeRef =putTMVar nodeRef newTail `orElse` yetOther'sTailwhereyetOther'sTail = do(ValueNode _other'sVal other'sTs other'sNxt) <-readTMVar nodeRefif other'sTs >= myTsthen return ()else putAsNewTailOrDiscard other'sNxtliftIO $ atomically $ putAsNewTailOrDiscard spotNxtwherejustLatest :: (a -> Timestamp -> a -> Timestamp -> a)justLatest _prevVal _prevTs spotVal _spotTs = spotVal-- Each concurrent thread is supposed to have its local 'ValueSink' reference-- "cached" over time, but keep in mind that for any such thread who is slow-- in unfolding the value stream, the historical values will pile up in heap.On 2021-09-03, at 16:26, YueCompl <compl.yue@icloud.com> wrote:It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:"server, tell me if there is a value of x newer than t."and do further mutate-or-giveup, like this:data ValueNode a = ValueNode{ node'value :: a,node'timestamp :: Timestamp,node'next :: ValueSink a}type ValueSink a = TMVar (ValueNode a)type Timestamp = IntseekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))seekTail sink = go sink Nothingwherego ref ancestor =tryReadTMVar ref >>= \caseNothing -> return (ref, ancestor)Just self@(ValueNode _ _ nxt) -> go nxt $ Just selfupdateValue ::forall a m.MonadIO m =>(Maybe (a, Timestamp) -> m (a, Timestamp)) ->ValueSink a ->m ()updateValue f sink = do(tailRef, tailNode) <- liftIO $ atomically $ seekTail sinkcase tailNode ofNothing -> do(myVal, myTs) <- f NothingliftIO $atomically $ donxt <- newEmptyTMVarvoid $ tryPutTMVar tailRef $ ValueNode myVal myTs nxtJust (ValueNode seenVal seenTs seenNxt) -> do(myVal, myTs) <- f $ Just (seenVal, seenTs)newNxt <- liftIO newEmptyTMVarIOlet newTail = ValueNode myVal myTs newNxtputAsNewTailOrDiscard :: ValueSink a -> STM ()putAsNewTailOrDiscard nodeRef =putTMVar nodeRef newTail `orElse` yetOther'sTailwhereyetOther'sTail = do(ValueNode _other'sVal other'sTs other'sNxt) <-readTMVar nodeRefif other'sTs >= myTsthen return ()else putAsNewTailOrDiscard other'sNxtliftIO $ atomically $ putAsNewTailOrDiscard seenNxt-- Each concurrent thread is supposed to have its local 'ValueSink' reference-- "cached" over time, but keep in mind that for any such thread who is slow-- in unfolding the value stream, the historical values will pile up in heap.On 2021-09-03, at 01:42, Olaf Klinke <olf@aatal-apotheke.de> wrote:On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
To be concrete, my state is a collection of time stamped values, where
the monoid operation overwrites old values with new ones.
But I need to know the current state (x,t) to determine the "mutation",
because I'll be asking questions like "server, tell me if there is a
value of x newer than t."
Any observer whose initial state is synchronized with the worker thread
can in principle re-construct the worker's internal state by observing
the stream of emitted "mutations".
The most general abstraction would be that of a monoid action on a
type, but in my case the monoid (mutations) and the mutated type are
identical.
act :: m -> a -> a
act memtpy = id
act (x <> y) = act x . act y -- monoid homomorphism
act (x <> x) = act x -- idempotent
Olaf