Different QSem(N) (was Re: IORef vs TVar performance: 6 seconds versus 4 minutes)

I think I can improve on your code. Bertram Felgenhauer wrote:
But why does it manually manage the waiters at all? MVars are fair, in ghc at least. So this should work:
data Sem = Sem (MVar Int) (MVar Int)
I changed the above to be a data
newSem :: Int -> IO Sem newSem initial = liftM2 Sem (newMVar initial) newEmptyMVar
-- | Wait for a unit to become available waitSem :: Sem -> IO () waitSem (Sem sem wakeup) = do avail' <- modifyMVar sem (\avail -> return (avail-1, avail-1))
Threads can get out of order at this point. This "order bug" may be undesirable. Also, killing the thread while it waits for "wakeup" below would be bad. You need an exception handler and some kind of cleanup.
when (avail' < 0) $ takeMVar wakeup >>= putMVar sem
-- | Signal that a unit of the 'Sem' is available signalSem :: Sem -> IO () signalSem (Sem sem wakeup) = do avail <- takeMVar sem if avail < 0 then putMVar wakeup (avail+1) else putMVar sem (avail+1)
You should change this from "= do" to "= block $ do".
(I should turn this into a library proposal.)
Bertram
If you do not need to take N at a time then the untested code below has no "order bug" and is fair.
module Sem where
import Control.Concurrent.MVar import Control.Monad(when,liftM2)
data Sem = Sem { avail :: MVar Int -- ^ provides fast path and fair queue , lock :: MVar () } -- ^ Held while signalling the queue
-- It makes no sense here to initialize with a negative number, so -- this is treated the same as initializing with 0. newSem :: Int -> IO Sem newSem init | init < 1 = liftM2 Sem newEmptyMVar (newMVar ()) | otherwise = liftM2 Sem (newMVar init) (newMVar ())
waitSem :: Sem -> IO () waitSem (Sem sem _) = block $ do avail <- takeMVar sem when (avail > 1) (signalSemN (pred avail))
signalSem :: Sem -> IO () signalSem = signalSemN 1
signalSemN :: Int -> Sem -> IO () signalSemN i (Sem sem lock) | i <= 1 = return () | otherwise = withMVar lock $ \ _ -> block $ do old <- tryTakeMVar sem case old of Nothing -> putMVar sem i Just v -> putMVar sem $! succ i
All waitSem block in arrival order with the takeMVar in waitSem. The signalSemN avoid conflicting by serializing on the "MVar ()" lock. The above is quite fast so long as the semaphore holds no more than the value 1. Once it hold more than 1 the waiter must take time to add back the remaining value. Note that once threads are woken up in order, they may still go out of order blocking for the () lock when adding back the remaining value (in the presence of other signalers). The above is also exception safe. The only place it can die is during the takeMVar and this merely remove a blocked waiter. I see no way to add a fair waitSemN without changing Sem. But if I change Sem then I can make a fair waitSemN. The untested code is below:
module Sem where
import Control.Concurrent.MVar import Control.Monad(when,liftM3) import Control.Exception.Base
data Sem = Sem { semWait :: MVar () -- for serializing waiting threads , semAvail :: MVar Int -- positive quantity available , semSignal :: MVar () -- for serializing signaling threads }
newSem i | i<=0 = liftM3 Sem (newMVar ()) newEmptyMVar (newMVar ()) | otherwise = liftM3 Sem (newMVar ()) (newMVar i) (newMVar ())
waitSem :: Sem -> IO () waitSem = waitSemN 1
waitSemN :: Int -> Sem -> IO () waitSemN i sem@(Sem w a s) | i<=0 = return () | otherwise = withMVar w $ \ _ -> block $ do let go n = do avail <- onException (takeMVar a) (signalSemN (i-n) sem) case compare avail n of LT -> go $! n-avail EQ -> return () GT -> signalSemN (avail-n) sem go i
signalSem :: Sem -> IO () signalSem = signalSemN 1
signalSemN :: Int -> Sem -> IO () signalSemN i (Sem _ a s) | i<=0 = return () | otherwise = withMVar s $ \ _ -> block $ do ma <- tryTakeMVar a case ma of Nothing -> putMVar a i Just v -> putMVar a $! v+i
Trying for exception safety makes the above slightly tricky. It works by allowing only a single thread to get the semWait lock. This keeps all the arriving threads in the fair blocking queue for the semWait lock. The holder of the semWait lock then nibbles at semAvail's positive value until it is satisfied. Excess value is added back safely with signalSemN. Cheers, Chris Kuklewicz

ChrisK wrote:
I think I can improve on your code.
Bertram Felgenhauer wrote:
-- | Wait for a unit to become available waitSem :: Sem -> IO () waitSem (Sem sem wakeup) = do avail' <- modifyMVar sem (\avail -> return (avail-1, avail-1))
Threads can get out of order at this point.
Is this observable, i.e. distinguishable from the threads entering 'waitSem' in a different order? I think not.
Also, killing the thread while it waits for "wakeup" below would be bad. You need an exception handler and some kind of cleanup.
True. I didn't try for exception safety, mainly because Control.Concurrent.QSem isn't currently exception safe. I would require the caller of waitSem/signalSem to call 'block' if they need exception safety, because outside any 'block', an exception might occur right before or after the semaphore operation - causing tokens (the things that the semaphore counter counts) to get unaccountably lost or created, making exception safety rather meaningless.
If you do not need to take N at a time then the untested code below has no "order bug" and is fair.
module Sem where import Control.Concurrent.MVar import Control.Monad(when,liftM2) data Sem = Sem { avail :: MVar Int -- ^ provides fast path and fair queue , lock :: MVar () } -- ^ Held while signalling the queue -- It makes no sense here to initialize with a negative number, so -- this is treated the same as initializing with 0. newSem :: Int -> IO Sem newSem init | init < 1 = liftM2 Sem newEmptyMVar (newMVar ()) | otherwise = liftM2 Sem (newMVar init) (newMVar ()) waitSem :: Sem -> IO () waitSem (Sem sem _) = block $ do avail <- takeMVar sem when (avail > 1) (signalSemN (pred avail))
These (pred avail) tokens may be lost if signalSemN blocks on the semaphore lock and an asynchronous exception is caught at that point. (withMVar uses takeMVar internally, and the fact that it's inside 'block' doesn't help - it's a blocking operation)
signalSem :: Sem -> IO () signalSem = signalSemN 1 signalSemN :: Int -> Sem -> IO () signalSemN i (Sem sem lock) | i <= 1 = return () ^^^^^^ should be i <= 0
| otherwise = withMVar lock $ \ _ -> block $ do old <- tryTakeMVar sem case old of Nothing -> putMVar sem i Just v -> putMVar sem $! succ i
^^^^^^ should be old + i
I see no way to add a fair waitSemN without changing Sem. But if I change Sem then I can make a fair waitSemN. The untested code is below:
signalSemN :: Int -> Sem -> IO () signalSemN i (Sem _ a s) | i<=0 = return () | otherwise = withMVar s $ \ _ -> block $ do
Same as above: Exceptions may creep into the withMVar, and the signalSemN call from waitSemN may thus fail.
ma <- tryTakeMVar a case ma of Nothing -> putMVar a i Just v -> putMVar a $! v+i
Trying for exception safety makes the above slightly tricky.
Indeed. I need to think about this some more.
Cheers, Chris Kuklewicz
Bertram
participants (2)
-
Bertram Felgenhauer
-
ChrisK