
I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction. I think I've convinced myself that it's not possible, but anyone care to differ? cheers, rog.

-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 roger peppe wrote:
I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
I think I've convinced myself that it's not possible, but anyone care to differ?
This sounds similar to what I once tried for a FRP implementation. I decided that what I was trying to do was impossible (at least without recalculating things all the time whenever something retries), and then constructed an IVar implementation which was working very elegantly (before I decided to change the core ideas behind my FRP implementation). As luck would have it, Luke Palmer recently added his own IVar implementation to Hackage, which you may find a use for. - - Jake -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.9 (MingW32) Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org iEYEARECAAYFAkjtMQwACgkQye5hVyvIUKk39QCcDdsFY7z0k2xwkJ/zBK3tVsbT n28An2D9LzUfWiWTpKIU7J7tt/VGs4EB =kQGz -----END PGP SIGNATURE-----

I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
Assuming that retry blocks until something changes, you could associate a channel with a thread that encapsulates the transaction. Somewhat like this? import Control.Concurrent.STM import Control.Concurrent sender write confirm = do atomically $ putTMVar write "hi" atomically $ takeTMVar confirm putStrLn "sender done" receiver read = do r <- atomically $ takeTMVar read putStrLn $ "receiver done: "++r channel read write confirm = atomically $ do w <- takeTMVar write putTMVar read w putTMVar confirm () main = do (read,write,confirm) <- atomically $ do read <- newEmptyTMVar write <- newEmptyTMVar confirm <- newEmptyTMVar return (read,write,confirm) forkIO (channel read write confirm) forkIO (threadDelay 3000000 >> receiver read) threadDelay 5000000 >> sender write confirm You might also be interested in http://hackage.haskell.org/cgi-bin/hackage-scripts/package/chp Claus

2008/10/9 Claus Reinke
I was wondering if it was possible to implement synchronous channels
within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
Assuming that retry blocks until something changes, you could associate a channel with a thread that encapsulates the transaction. Somewhat like this?
You don't need an additional channel thread: module SyncChan (SyncChan, send, recv, newSyncChan) where import Control.Concurrent.STM import Control.Monad import Control.Concurrent newtype SyncChan a = SC { unSC :: TVar (State a) } data State a = Ready | Sent a | Received newSyncChan :: STM (SyncChan a) newSyncChan = SC `fmap` newTVar Ready send :: SyncChan a -> a -> IO () send (SC chan) x = do atomically $ unsafeSend chan x atomically $ waitReceiver chan recv :: SyncChan a -> STM a recv (SC chan) = do s <- readTVar chan case s of Sent s -> writeTVar chan Received >> return s _ -> retry unsafeSend chan x = do s <- readTVar chan case s of Ready -> writeTVar chan (Sent x) _ -> retry waitReceiver chan = do s <- readTVar chan case s of Received -> writeTVar chan Ready _ -> retry x |> f = fmap f x test b = do (x,y) <- atomically $ liftM2 (,) newSyncChan newSyncChan forkIO $ join $ atomically $ -- since recv is in STM you can wait on multiple channels at the same time (recv x |> print) `mplus` (recv y |> print) if b then send x 'a' else send y 1 as a bonus you can also try to send to the first available among multiple channels: (this formulation uses ExistentialQuantification but it's just a convenience) data Sending a = forall b. Sending (SyncChan b) b a sendMulti :: [Sending a] -> IO a sendMulti [] = fail "empty" sendMulti xs = do (m,r) <- atomically $ msum $ map sending xs atomically m return r sending :: Sending t -> STM (STM (), t) sending (Sending (SC chan) x k) = do unsafeSend chan x return (waitReceiver chan,k)

I don't think what you want is possible if both sides are in STM.
Other authors have posted solutions where one side or the other of the
transaction is in I/O, but wholly inside STM it's not possible.
The problem is that in order for synchronization to happen, you need
both sides to be able to communicate with each other. But the whole
point of STM is that each transaction is atomically isolated from
other transactions running concurrently; you can find out what
transactions that committed before you have done, but there is no way
for you to interact with another transaction in-flight.
-- ryan
On Wed, Oct 8, 2008 at 11:10 PM, roger peppe
I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
I think I've convinced myself that it's not possible, but anyone care to differ?
cheers, rog. _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On Thu, Oct 9, 2008 at 9:15 AM, Ryan Ingram
I don't think what you want is possible if both sides are in STM. Other authors have posted solutions where one side or the other of the transaction is in I/O, but wholly inside STM it's not possible.
Thanks, that's what I thought, although I wasn't sure of it, being new to both Haskell and STM. Presumably this result means that it's not possible to implement any bounded-buffer-type interface within (rather than on top of) STM. Isn't that a rather serious restriction? cheers, rog.

On Thu, Oct 9, 2008 at 10:50, roger peppe
On Thu, Oct 9, 2008 at 9:15 AM, Ryan Ingram
wrote: I don't think what you want is possible if both sides are in STM. Other authors have posted solutions where one side or the other of the transaction is in I/O, but wholly inside STM it's not possible.
Thanks, that's what I thought, although I wasn't sure of it, being new to both Haskell and STM.
Presumably this result means that it's not possible to implement any bounded-buffer-type interface within (rather than on top of) STM.
Isn't that a rather serious restriction?
Sorry, I come into this discussion late. One-place buffers, or MVars, are indeed implemented over STM in the orignal paper [1]. Is that what you seek? Copied from the paper, it looks like this: type MVar a = TVar (Maybe a) newEmptyMVar :: STM (MVar a) newEmptyMVar = newTVar Nothing takeMVar :: MVar a -> STM a takeMVar mv = do v <- readTVar mv case v of Nothing -> retry Just val -> do writeTVar mv Nothing return val putMVar :: MVar a -> a -> STM () putMVar mv val = do v <- readTVar mv case v of Nothing -> writeTVar mv (Just val) Just _ -> retry Again, sorry if I'm missing your point. Note that transactions cannot "block" like threads, when they retry - they do block, but when they are unblocked they are retried from the top. This is naturally due to the requirement that a transaction must not be affected by something that happens concurrently. [1] also goes to implement buffered, multi-item, multi-cast channels. [1] Tim Harris, Simon Marlow, Simon Peyton Jones, Maurice Herlihy. Composable Memory Transactions. cheers, Arnar

On Thu, Oct 9, 2008 at 10:12 AM, Arnar Birgisson
Sorry, I come into this discussion late. One-place buffers, or MVars, are indeed implemented over STM in the orignal paper [1].
Yes, I should have remembered that. It's ok just as long as there's a buffer there because then there's a consistent *memory* state that can be awaited. But using an unbuffered channel, we need to await the state of another *process*, so we can rendezvous with it, something that is not possible in principle with STM.
Is that what you seek?
It's useful, thanks, but not really what I was originally looking for.
Synchronous channels are generally easier to reason about (less states
to deal with).
On Thu, Oct 9, 2008 at 10:19 AM, Andrea Vezzosi
I'd rather say that STM is intended to be used just for building up transactions
It seems to me then that, despite the existence of the very elegant STM, there's still room for a commonly used set of transactions outside of it, with as many as possible within STM, along the lines of the SyncChan snippet you posted earlier. By the way, where does FRP (which I haven't got my head around yet) sit with respect to STM? cheers, rog.

On Thu, Oct 9, 2008 at 12:29, roger peppe
It's useful, thanks, but not really what I was originally looking for. Synchronous channels are generally easier to reason about (less states to deal with).
Right, that's very true. Interaction between transactions is naturally forbidden by the I in ACId (small d because that one doesn't really apply to memory transactions), but that does not mean you can't implement bounded buffers as you said. cheers, Arnar

roger peppe wrote:
By the way, where does FRP (which I haven't got my head around yet) sit with respect to STM?
Entirely orthogonal. FRP is not generally thought of as (explicitly) threaded at all. It's more declarative than that. It's also supposed to be deterministic (up to the determinism of input events) which STM is not. However an elegant, efficient, implementation of FRP in pure haskell evades us, so people discuss different ways to implement it which use, possibly, concurrency such as STM or otherwise, under the hood. So STM may or may not be a good tool to implement FRP, but at the level that you *use* FRP, any threading should be entirely implicit. Jules

I'd rather say that STM is intended to be used just for building up
transactions, not to model your whole process/thread, simply because in the
latter case your process couldn't have any observable intermediate state, or
put in another way, between any two transactions the information can only go
in one direction.
So, since synchronizing two processes needs bidirectional communication, you
have to use more than one transaction.
In the end you still write processes in IO, but communicate via transactions
built in STM.
It's a good thing to expose your primitives as STM when you can, so that
users can build larger transactions on top of them, but a synchronous send
is not a transaction from the start so there's no choice.
2008/10/9 roger peppe
On Thu, Oct 9, 2008 at 9:15 AM, Ryan Ingram
wrote: I don't think what you want is possible if both sides are in STM. Other authors have posted solutions where one side or the other of the transaction is in I/O, but wholly inside STM it's not possible.
Thanks, that's what I thought, although I wasn't sure of it, being new to both Haskell and STM.
Presumably this result means that it's not possible to implement any bounded-buffer-type interface within (rather than on top of) STM.
Isn't that a rather serious restriction?
cheers, rog. _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On Thu, Oct 9, 2008 at 1:50 AM, roger peppe
On Thu, Oct 9, 2008 at 9:15 AM, Ryan Ingram
wrote: I don't think what you want is possible if both sides are in STM. Other authors have posted solutions where one side or the other of the transaction is in I/O, but wholly inside STM it's not possible.
Thanks, that's what I thought, although I wasn't sure of it, being new to both Haskell and STM.
Presumably this result means that it's not possible to implement any bounded-buffer-type interface within (rather than on top of) STM.
Isn't that a rather serious restriction?
I don't know that it's practically-speaking that serious. One can write it in IO, using STM. I think of CSP as I/O anyway, but perhaps my thinking is flawed and dirty from MPI and Erlang "message passing" :-). Then again, I'm not sure why keeping it in STM is even valuable really. IO gets the job done right?
cheers, rog. _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

This seemed like an interesting problem, so I whipped together a quick-and-dirty implementation of transactional CML semantics in Haskell using STM. Example: main = do forkIO chsThread -- administrative thread that manages communication forkIO (synchronize test1 >>= print) synchronize test2 >>= print "synchronize" is like "atomically", except it organizes groups of threads to do communication together before they can exit. Inside of synchronize, you have "readChan" and "writeChan" to communicate across channels. "newChan :: IO (Chan a)" creates a channel to communicate over. You even have "retry" and "orElse"; they are called "mzero" and "mplus", though :) The code is guaranteed to find an interleaving of any currently blocked processes that allows some subset of them to unblock, if there is one. Code is at http://ryani.freeshell.org/haskell/CHS.hs Lots of obvious optimizations present themselves; for one thing, right now, all the computation of the synchronizing threads takes place on the administrative thread. Also, the administrative thread attempts to connect all combinations of blocked threads; only attempting to communicate when a reader and writer meet on a particular channel would be another clear improvement. -- ryan

On Wed, Oct 8, 2008 at 3:10 PM, roger peppe
I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
I think I've convinced myself that it's not possible, but anyone care to differ?
Hi Rog!! (Plan 9/Inferno Rog?)
http://haskell.org/ghc/docs/latest/html/libraries/stm/Control-Concurrent-STM...
<-- isn't that it?
see writeTChan and readTChan. I assume readTChan is synchronous :-).
writeTChan may be asynchronous for all I can tell (haven't looked deeply).
But I did write a concurrent prime sieve with it:
module Main where
{--
Haskell Concurrent and sometimes parallel prime sieve of eratosthenes
cheers, rog. _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Hi there,
2008/10/9 David Leimbach
see writeTChan and readTChan. I assume readTChan is synchronous :-). writeTChan may be asynchronous for all I can tell (haven't looked deeply).
writeTChan is asynchronous, i.e. channels in this case are unbounded buffers.
But I did write a concurrent prime sieve with it:
I did the same, with the one-place-buffers (the MVars implemented over STM). Be warned that there is no stop condition, this just keeps printing primes forever. import Control.Concurrent (forkIO) import Control.Concurrent.STM -- MVars from the STM paper type MVar a = TVar (Maybe a) newEmptyMVar :: STM (MVar a) newEmptyMVar = newTVar Nothing takeMVar :: MVar a -> STM a takeMVar mv = do v <- readTVar mv case v of Nothing -> retry Just val -> do writeTVar mv Nothing return val putMVar :: MVar a -> a -> STM () putMVar mv val = do v <- readTVar mv case v of Nothing -> writeTVar mv (Just val) Just _ -> retry -- Sieve forever a = do a; forever a pfilter :: Int -> MVar Int -> MVar Int -> IO () pfilter p in_ out = forever $ do atomically $ do v <- takeMVar in_ if v `mod` p /= 0 then putMVar out v else return () sieve :: MVar Int -> MVar Int -> IO () sieve in_ out = do p <- atomically $ takeMVar in_ atomically $ putMVar out p ch <- atomically $ newEmptyMVar forkIO $ pfilter p in_ ch sieve ch out feeder :: MVar Int -> IO () feeder out = feed' 2 where feed' i = do atomically $ putMVar out i feed' (i+1) printer :: MVar Int -> IO () printer in_ = forever $ do v <- atomically $ takeMVar in_ putStrLn $ show v main :: IO () main = do in_ <- atomically newEmptyMVar out <- atomically newEmptyMVar forkIO $ feeder in_ forkIO $ printer out forkIO $ sieve in_ out return () cheers, Arnar

On Thu, Oct 9, 2008 at 18:10, Arnar Birgisson
But I did write a concurrent prime sieve with it:
I did the same, with the one-place-buffers (the MVars implemented over STM). Be warned that there is no stop condition, this just keeps printing primes forever.
Please forgive me for reposting, but the last one exited quite prematurely :) module Main where import Control.Concurrent (forkIO) import Control.Concurrent.STM import System (getArgs) -- MVars from the STM paper type MVar a = TVar (Maybe a) newEmptyMVar :: STM (MVar a) newEmptyMVar = newTVar Nothing takeMVar :: MVar a -> STM a takeMVar mv = do v <- readTVar mv case v of Nothing -> retry Just val -> do writeTVar mv Nothing return val putMVar :: MVar a -> a -> STM () putMVar mv val = do v <- readTVar mv case v of Nothing -> writeTVar mv (Just val) Just _ -> retry -- Sieve forever a = do a; forever a pfilter :: Int -> MVar Int -> MVar Int -> IO () pfilter p in_ out = forever $ do atomically $ do v <- takeMVar in_ if v `mod` p /= 0 then putMVar out v else return () sieve :: MVar Int -> MVar Int -> IO () sieve in_ out = do p <- atomically $ takeMVar in_ atomically $ putMVar out p ch <- atomically $ newEmptyMVar forkIO $ pfilter p in_ ch sieve ch out feeder :: MVar Int -> IO () feeder out = feed' 2 where feed' i = do atomically $ putMVar out i feed' (i+1) printer :: MVar () -> MVar Int -> Int -> IO () printer stop in_ max = do v <- atomically $ takeMVar in_ putStrLn $ show v if v > max then atomically $ putMVar stop () else printer stop in_ max main :: IO () main = do max:_ <- getArgs in_ <- atomically newEmptyMVar out <- atomically newEmptyMVar stop <- atomically newEmptyMVar forkIO $ feeder in_ forkIO $ printer stop out (read max) forkIO $ sieve in_ out atomically $ takeMVar stop return ()

I was wondering if it was possible to implement synchronous channels within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
I think I've convinced myself that it's not possible, but anyone care to differ?
Not quite the same thing, but you can implement synchronous message passing fairly easily with MVars. Here's a small example that builds coroutines, courtesy of ddarius: http://codepad.org/GwtS6wMj you could modify this slightly to have the receiver release the sender as soon as the message is received, if that's what you wanted. Is this close enough for you, or does it have to be in STM?
cheers, rog.
Tim Newsham http://www.thenewsh.com/~newsham/
participants (9)
-
Andrea Vezzosi
-
Arnar Birgisson
-
Claus Reinke
-
David Leimbach
-
Jake McArthur
-
Jules Bean
-
roger peppe
-
Ryan Ingram
-
Tim Newsham