
Here's a much simpler implementation for that sort of pattern, using channels to fan out work to threads. I added a dependency on Criterion because getCPUTime is basically useless for this kind of measurement on Mac OS X since it doesn't include the time that the process spent waiting on IO: {-# Language OverloadedStrings #-} import System.CPUTime import Network.Socket hiding(recv) import Network.Socket.ByteString import Control.Exception (handle, IOException) import System.Environment import Control.Concurrent import Control.Monad import Data.Either (partitionEithers) import Data.List (intercalate) import Criterion.Measurement (time, time_, secs) main :: IO () main = do (host, port, conc, reqs) <- fmap parse getArgs putStrLn $ "Connecting to " ++ host ++ " " ++ port (servAddr:_) <- getAddrInfo Nothing (Just host) (Just port) (diff, results) <- time $ process servAddr conc reqs let (errs, succs) = partitionEithers results numSuccs = length succs numErrs = length errs succTime = sum succs succAvg = succTime / fromIntegral numSuccs putStrLn $ unwords [show numSuccs, "successes,", show numErrs, "errors in", secs diff] when (numSuccs > 0) $ do putStrLn $ "min/max/avg request time: " ++ intercalate " / " (map secs [minimum succs, maximum succs, succAvg]) putStrLn $ show (round (fromIntegral reqs / diff) :: Int) ++ " r/s" parse :: [String] -> (String, String, Int, Int) parse [h,p,conc,reqs] = (h, p, read conc, read reqs) parse _ = error "usage client host port concurrency requests" process :: AddrInfo -> Int -> Int -> IO [Either IOException Double] process servAddr conc reqs = do reqChan <- newChan ackChan <- newChan let processThread = forever $ do _ <- readChan reqChan handle (return . Left) (fmap Right socketAction) >>= writeChan ackChan socketAction = time_ $ do sock <- socket (addrFamily servAddr) Stream defaultProtocol connect sock (addrAddress servAddr) sendAll sock "GET /\r\n\r\n" void $ recv sock 1024 close sock replicateM_ reqs (writeChan reqChan ()) replicateM_ (min conc reqs) (forkIO processThread) replicateM reqs (readChan ackChan) On Tue, Jan 7, 2014 at 10:55 AM, Branimir Maksimovic < branimir.maksimovic@gmail.com> wrote:
I have test network client, something like apache bench tool. It uses mvars to synchronize and everything is ok when compiled without -threaded. real 0m2.995s user 0m0.601s sys 0m2.391s
With -threaded compile option result is following: real 0m18.196s user 0m2.054s sys 0m3.313s
Seems that program is sleeping most of the time for some reason. I can't explain behavior as it seems that program is ok. It starts `concurrency` threads which wait on mvar to process next task.
Program follows:
{-# Language OverloadedStrings #-} import System.CPUTime import System.IO --import System.IO.Error import Network.Socket hiding(recv) import Network.Socket.ByteString import System.Environment import Control.Concurrent import Control.Exception
main = do n <- getArgs let (host,port,conc,reqs) = parse n putStrLn $ "Connecting to " ++ host ++ " " ++ port s <- getAddrInfo Nothing (Just host) (Just port) let servAddr = head s begin <- getCPUTime process servAddr conc reqs end <- getCPUTime let diff = (fromIntegral (end - begin))/(10^12) :: Double putStrLn $ show (round (fromIntegral reqs / diff)) ++ " r/s"
parse [h,p,conc,reqs] = (h,p,read conc::Int,read reqs::Int) parse _ = error "usage client host port concurrency requests"
process servAddr conc reqs = do let niter = if reqs >= conc then conc else reqs putStrLn $ "loop " ++ show niter mvars <- initThreads niter [] putStrLn $ "Initialized " ++ show niter let loop n (m:mvs) f | n>0 = do flag <- isEmptyMVar m if f > length mvars then putStrLn "busy" else return () if flag || f > length mvars then do putMVar m () loop (n-1) mvs 0 else loop n mvs (f+1) | otherwise = return () loop n [] f = if n>0 then loop n mvars f else return () putStrLn $ "length " ++ show (length mvars) loop (reqs-niter) mvars 0 where initThreads niter vars | niter > 0 = do mvar <- newMVar () forkIO $ process mvar initThreads (niter-1) (mvar:vars) | otherwise = return vars process mvar = do sock <- socket (addrFamily servAddr) Stream defaultProtocol connect sock (addrAddress servAddr) sendAll sock "Hello World!\n" buf <- recv sock 1024 close sock takeMVar mvar process mvar
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe