I happen to be pondering with ideas about mutable yet shared data/state, unlike the actor model, a Turing machine has no notation of external change notification, I wonder this limitation propagates to lambda calculus, so there seem be no approved ways in pure functional paradigm, to handle concurrent mutation on shared state, since it's not modeled in the first place.

From your updated description, I guess `atomicModifyIORef` (or maybe the strict version `atomicModifyIORef'` more desirable?) might work for you as well, and it's further more performant than 'TVar' based.

Btw, just come to my mind about the timestamps, if they are of high precision and come from different server nodes, you would not trust their total ordering, since small drifts are allowed/inevitable even the clocks are actively synchronized.

On 2021-09-06, at 17:04, Olaf Klinke <olf@aatal-apotheke.de> wrote:

Dear Compl, 

thanks for putting so much thought into this. Regarding your
suggestion, I'm afraid I misled you somehow or I don't understand the
purpose of your code. I don't need the entire history of each value,
only the most recent one. Remark: The ValueNode looks like a ListT
(like in list-t package) over the STM monad.

We're developing a gateway between two data protocols. We have a Python
implementation, but it is not parallel enough and message parsing is
too slow. Therefore we hired MLabs to implement the source protocol in
Haskell [1]. The gateway does the following. 

1. Parent thread reads a config and forks several concurrent child
threads, each talking to a different server. The state of the children
is read through TVars.
2. The child thread polls data from its assigned server, decodes the
message and executes pre-defined callbacks [2], which are functions
   type Callback m = Value -> UTCTime -> m ().
3. Each data packet can contain several data items and I want these
callbacks to be executed in parallel, too. This is because we expect
the network IO to be the slowest part in the entire process. I use the
parallel-io package for that. 

import Control.Monad.Reader
import Control.Monad.IO.Unlift (MonadUnliftIO(..))
import Control.Concurrent.ParallelIO.Local (Pool,parallel_)

type CallbackStrategy m n = forall f. Foldable f => 
  f (m ()) -> n ()
sequentialStrategy :: Applicative m => CallbackStrategy m m
sequentialStrategy = Data.Foldable.sequenceA_    
-- ^ just sequence the callbacks

concurrentStrategy :: MonadUnliftIO m => 
  CallbackStrategy m (ReaderT Pool m)
concurrentStrategy = execConcurrently . 
 Data.Foldable.toList where
   execConcurrently actionlist = ReaderT (\pool -> 
    withRunInIO (\asIO -> parallel_ pool (map asIO actionlist)))
-- ^ execute the callbacks concurrently on a pool of worker threads

I'd be quite happy with m = StateT s IO where s holds the most recent
time stamp and value for each data address. The simplest way to
broadcast state in this monad would be: 

broadcast :: Monoid s => TVar s -> StateT s IO () -> StateT s IO ()
broadcast var (StateT f) = StateT (\s -> do
 s' <- f s
 atomically (modifyTVar ref (\s -> s <> s')))

That is, we run the StateT action and send the changes to the TVar
afterwards. But there are two more problems with this:

(1) The concurrent callbacks all want to atomically modify the same
TVar. This is a concurrency bottleneck and effectively implements Isaac
Elliott's Locks. Hence it seems that giving each data address its own
value TVar could allow more parallelism. 

(2) StateT is inherently sequential, which defeats concurrentStrategy.
In fact the unliftio-core package states:
Note that, in order to meet the laws given below, the intuition is that a monad must have no monadic state, but may have monadic context. This essentially limits MonadUnliftIO to ReaderT and IdentityT transformers on top of IO.
Intuitively, there is no way to safely and concurrently modify the same
state. Nice example of how the type system guides you towards doing the
right thing. 

Therefore I will go back to what Chris Smith and Bryan Richter
suggested, and use ReaderT (TVar s) individually for each value. And
your suggestion of TMVars is also good in finer granularity, I think.
In the following, m can have a MonadUnliftIO instance. 

unStateT :: MonadIO m => TMVar s -> 
 Callback (StateT s m) -> Callback m
unStateT var cb = \value time -> do
   before <- (liftIO.atomically) (takeTMVar var)
   -- TMVar is now empty, no other thread can access it
   after <- execStateT (cb value time) before
   (liftIO.atomically) (putTMVar var after)
   -- TMVar now full again

Thanks everyone for helping!
Olaf 


[1] https://github.com/mlabs-haskell/opc-xml-da-client
[2] I suppose in FP-land we'd name these continuations, not callbacks.
Not sure whether the continuation monad abstraction buys me any
advantage here, though. 

On Sat, 2021-09-04 at 18:50 +0800, YueCompl wrote:
Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.

And I realize this is more than you originally need, never mind if it's not so useful to you.


data ValueNode a = ValueNode
 { node'value :: a,
   node'timestamp :: Timestamp,
   node'next :: ValueSink a
 }

type ValueSink a = TMVar (ValueNode a)

type Timestamp = Int

seekTail ::
 forall a.
 (a -> Timestamp -> a -> Timestamp -> a) ->
 ValueSink a ->
 STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
 where
   go ::
     ValueSink a ->
     Maybe (ValueNode a) ->
     STM (ValueSink a, Maybe (ValueNode a))
   go ref prevNode =
     tryReadTMVar ref >>= \case
       Nothing -> return (ref, prevNode)
       Just self@(ValueNode spotVal spotTs nxt) ->
         go nxt $
           Just
             self
               { node'value = case prevNode of
                   Nothing -> spotVal
                   Just (ValueNode prevVal prevTs _prevNxt) ->
                     f prevVal prevTs spotVal spotTs
               }

updateValue ::
 forall a m.
 MonadIO m =>
 (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
 ValueSink a ->
 m (ValueSink a)
updateValue f sink = do
 (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
 case tailNode of
   Nothing -> do
     (myVal, myTs) <- f Nothing
     liftIO $
       atomically $ do
         nxt <- newEmptyTMVar
         void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
         return nxt
   Just (ValueNode spotVal spotTs spotNxt) -> do
     (myVal, myTs) <- f $ Just (spotVal, spotTs)
     newNxt <- liftIO newEmptyTMVarIO
     let newTail = ValueNode myVal myTs newNxt

         putAsNewTailOrDiscard :: ValueSink a -> STM ()
         putAsNewTailOrDiscard nodeRef =
           putTMVar nodeRef newTail `orElse` yetOther'sTail
           where
             yetOther'sTail = do
               (ValueNode _other'sVal other'sTs other'sNxt) <-
                 readTMVar nodeRef
               if other'sTs >= myTs
                 then return ()
                 else putAsNewTailOrDiscard other'sNxt

     liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
     return spotNxt
 where
   justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
   justLatest _prevVal _prevTs spotVal _spotTs = spotVal

-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.


On 2021-09-04, at 18:42, YueCompl <compl.yue@icloud.com> wrote:

I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:


data ValueNode a = ValueNode
 { node'value :: a,
   node'timestamp :: Timestamp,
   node'next :: ValueSink a
 }

type ValueSink a = TMVar (ValueNode a)

type Timestamp = Int

seekTail ::
 forall a.
 (a -> Timestamp -> a -> Timestamp -> a) ->
 ValueSink a ->
 STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
 where
   go ::
     ValueSink a ->
     Maybe (ValueNode a) ->
     STM (ValueSink a, Maybe (ValueNode a))
   go ref prevNode =
     tryReadTMVar ref >>= \case
       Nothing -> return (ref, prevNode)
       Just self@(ValueNode spotVal spotTs nxt) ->
         go nxt $
           Just
             self
               { node'value = case prevNode of
                   Nothing -> spotVal
                   Just (ValueNode prevVal prevTs _prevNxt) ->
                     f prevVal prevTs spotVal spotTs
               }

updateValue ::
 forall a m.
 MonadIO m =>
 (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
 ValueSink a ->
 m ()
updateValue f sink = do
 (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
 case tailNode of
   Nothing -> do
     (myVal, myTs) <- f Nothing
     liftIO $
       atomically $ do
         nxt <- newEmptyTMVar
         void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
   Just (ValueNode spotVal spotTs spotNxt) -> do
     (myVal, myTs) <- f $ Just (spotVal, spotTs)
     newNxt <- liftIO newEmptyTMVarIO
     let newTail = ValueNode myVal myTs newNxt

         putAsNewTailOrDiscard :: ValueSink a -> STM ()
         putAsNewTailOrDiscard nodeRef =
           putTMVar nodeRef newTail `orElse` yetOther'sTail
           where
             yetOther'sTail = do
               (ValueNode _other'sVal other'sTs other'sNxt) <-
                 readTMVar nodeRef
               if other'sTs >= myTs
                 then return ()
                 else putAsNewTailOrDiscard other'sNxt

     liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
 where
   justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
   justLatest _prevVal _prevTs spotVal _spotTs = spotVal

-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.



On 2021-09-03, at 16:26, YueCompl <compl.yue@icloud.com <mailto:compl.yue@icloud.com>> wrote:

It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:

"server, tell me if there is a value of x newer than t." 

and do further mutate-or-giveup, like this:


data ValueNode a = ValueNode
 { node'value :: a,
   node'timestamp :: Timestamp,
   node'next :: ValueSink a
 }

type ValueSink a = TMVar (ValueNode a)

type Timestamp = Int

seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
seekTail sink = go sink Nothing
 where
   go ref ancestor =
     tryReadTMVar ref >>= \case
       Nothing -> return (ref, ancestor)
       Just self@(ValueNode _ _ nxt) -> go nxt $ Just self

updateValue ::
 forall a m.
 MonadIO m =>
 (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
 ValueSink a ->
 m ()
updateValue f sink = do
 (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
 case tailNode of
   Nothing -> do
     (myVal, myTs) <- f Nothing
     liftIO $
       atomically $ do
         nxt <- newEmptyTMVar
         void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
   Just (ValueNode seenVal seenTs seenNxt) -> do
     (myVal, myTs) <- f $ Just (seenVal, seenTs)
     newNxt <- liftIO newEmptyTMVarIO
     let newTail = ValueNode myVal myTs newNxt

         putAsNewTailOrDiscard :: ValueSink a -> STM ()
         putAsNewTailOrDiscard nodeRef =
           putTMVar nodeRef newTail `orElse` yetOther'sTail
           where
             yetOther'sTail = do
               (ValueNode _other'sVal other'sTs other'sNxt) <-
                 readTMVar nodeRef
               if other'sTs >= myTs
                 then return ()
                 else putAsNewTailOrDiscard other'sNxt

     liftIO $ atomically $ putAsNewTailOrDiscard seenNxt

-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.



On 2021-09-03, at 01:42, Olaf Klinke <olf@aatal-apotheke.de <mailto:olf@aatal-apotheke.de>> wrote:

On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.

To be concrete, my state is a collection of time stamped values, where
the monoid operation overwrites old values with new ones. 
But I need to know the current state (x,t) to determine the "mutation",
because I'll be asking questions like "server, tell me if there is a
value of x newer than t." 
Any observer whose initial state is synchronized with the worker thread
can in principle re-construct the worker's internal state by observing
the stream of emitted "mutations". 

The most general abstraction would be that of a monoid action on a
type, but in my case the monoid (mutations) and the mutated type are
identical. 

act :: m -> a -> a
act memtpy = id
act (x <> y) = act x . act y -- monoid homomorphism
act (x <> x) = act x         -- idempotent

Olaf