
I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes: data ValueNode a = ValueNode { node'value :: a, node'timestamp :: Timestamp, node'next :: ValueSink a } type ValueSink a = TMVar (ValueNode a) type Timestamp = Int seekTail :: forall a. (a -> Timestamp -> a -> Timestamp -> a) -> ValueSink a -> STM (ValueSink a, Maybe (ValueNode a)) seekTail f sink = go sink Nothing where go :: ValueSink a -> Maybe (ValueNode a) -> STM (ValueSink a, Maybe (ValueNode a)) go ref prevNode = tryReadTMVar ref >>= \case Nothing -> return (ref, prevNode) Just self@(ValueNode spotVal spotTs nxt) -> go nxt $ Just self { node'value = case prevNode of Nothing -> spotVal Just (ValueNode prevVal prevTs _prevNxt) -> f prevVal prevTs spotVal spotTs } updateValue :: forall a m. MonadIO m => (Maybe (a, Timestamp) -> m (a, Timestamp)) -> ValueSink a -> m () updateValue f sink = do (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink case tailNode of Nothing -> do (myVal, myTs) <- f Nothing liftIO $ atomically $ do nxt <- newEmptyTMVar void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt Just (ValueNode spotVal spotTs spotNxt) -> do (myVal, myTs) <- f $ Just (spotVal, spotTs) newNxt <- liftIO newEmptyTMVarIO let newTail = ValueNode myVal myTs newNxt putAsNewTailOrDiscard :: ValueSink a -> STM () putAsNewTailOrDiscard nodeRef = putTMVar nodeRef newTail `orElse` yetOther'sTail where yetOther'sTail = do (ValueNode _other'sVal other'sTs other'sNxt) <- readTMVar nodeRef if other'sTs >= myTs then return () else putAsNewTailOrDiscard other'sNxt liftIO $ atomically $ putAsNewTailOrDiscard spotNxt where justLatest :: (a -> Timestamp -> a -> Timestamp -> a) justLatest _prevVal _prevTs spotVal _spotTs = spotVal -- Each concurrent thread is supposed to have its local 'ValueSink' reference -- "cached" over time, but keep in mind that for any such thread who is slow -- in unfolding the value stream, the historical values will pile up in heap.
On 2021-09-03, at 16:26, YueCompl
wrote: It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
"server, tell me if there is a value of x newer than t."
and do further mutate-or-giveup, like this:
data ValueNode a = ValueNode { node'value :: a, node'timestamp :: Timestamp, node'next :: ValueSink a }
type ValueSink a = TMVar (ValueNode a)
type Timestamp = Int
seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a)) seekTail sink = go sink Nothing where go ref ancestor = tryReadTMVar ref >>= \case Nothing -> return (ref, ancestor) Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
updateValue :: forall a m. MonadIO m => (Maybe (a, Timestamp) -> m (a, Timestamp)) -> ValueSink a -> m () updateValue f sink = do (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink case tailNode of Nothing -> do (myVal, myTs) <- f Nothing liftIO $ atomically $ do nxt <- newEmptyTMVar void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt Just (ValueNode seenVal seenTs seenNxt) -> do (myVal, myTs) <- f $ Just (seenVal, seenTs) newNxt <- liftIO newEmptyTMVarIO let newTail = ValueNode myVal myTs newNxt
putAsNewTailOrDiscard :: ValueSink a -> STM () putAsNewTailOrDiscard nodeRef = putTMVar nodeRef newTail `orElse` yetOther'sTail where yetOther'sTail = do (ValueNode _other'sVal other'sTs other'sNxt) <- readTMVar nodeRef if other'sTs >= myTs then return () else putAsNewTailOrDiscard other'sNxt
liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
-- Each concurrent thread is supposed to have its local 'ValueSink' reference -- "cached" over time, but keep in mind that for any such thread who is slow -- in unfolding the value stream, the historical values will pile up in heap.
On 2021-09-03, at 01:42, Olaf Klinke
mailto:olf@aatal-apotheke.de> wrote: On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
To be concrete, my state is a collection of time stamped values, where the monoid operation overwrites old values with new ones. But I need to know the current state (x,t) to determine the "mutation", because I'll be asking questions like "server, tell me if there is a value of x newer than t." Any observer whose initial state is synchronized with the worker thread can in principle re-construct the worker's internal state by observing the stream of emitted "mutations".
The most general abstraction would be that of a monoid action on a type, but in my case the monoid (mutations) and the mutated type are identical.
act :: m -> a -> a act memtpy = id act (x <> y) = act x . act y -- monoid homomorphism act (x <> x) = act x -- idempotent
Olaf