
2008/10/9 Claus Reinke
I was wondering if it was possible to implement synchronous channels
within STM. In particular, I'd like to have CSP-like send and recv primitives on a channel that each block until the other side arrives to complete the transaction.
Assuming that retry blocks until something changes, you could associate a channel with a thread that encapsulates the transaction. Somewhat like this?
You don't need an additional channel thread: module SyncChan (SyncChan, send, recv, newSyncChan) where import Control.Concurrent.STM import Control.Monad import Control.Concurrent newtype SyncChan a = SC { unSC :: TVar (State a) } data State a = Ready | Sent a | Received newSyncChan :: STM (SyncChan a) newSyncChan = SC `fmap` newTVar Ready send :: SyncChan a -> a -> IO () send (SC chan) x = do atomically $ unsafeSend chan x atomically $ waitReceiver chan recv :: SyncChan a -> STM a recv (SC chan) = do s <- readTVar chan case s of Sent s -> writeTVar chan Received >> return s _ -> retry unsafeSend chan x = do s <- readTVar chan case s of Ready -> writeTVar chan (Sent x) _ -> retry waitReceiver chan = do s <- readTVar chan case s of Received -> writeTVar chan Ready _ -> retry x |> f = fmap f x test b = do (x,y) <- atomically $ liftM2 (,) newSyncChan newSyncChan forkIO $ join $ atomically $ -- since recv is in STM you can wait on multiple channels at the same time (recv x |> print) `mplus` (recv y |> print) if b then send x 'a' else send y 1 as a bonus you can also try to send to the first available among multiple channels: (this formulation uses ExistentialQuantification but it's just a convenience) data Sending a = forall b. Sending (SyncChan b) b a sendMulti :: [Sending a] -> IO a sendMulti [] = fail "empty" sendMulti xs = do (m,r) <- atomically $ msum $ map sending xs atomically m return r sending :: Sending t -> STM (STM (), t) sending (Sending (SC chan) x k) = do unsafeSend chan x return (waitReceiver chan,k)