Here's a much simpler implementation for that sort of pattern, using channels to fan out work to threads. I added a dependency on Criterion because getCPUTime is basically useless for this kind of measurement on Mac OS X since it doesn't include the time that the process spent waiting on IO:

{-# Language OverloadedStrings #-}
import System.CPUTime
import Network.Socket hiding(recv)
import Network.Socket.ByteString
import Control.Exception (handle, IOException)
import System.Environment
import Control.Concurrent
import Control.Monad
import Data.Either (partitionEithers)
import Data.List (intercalate)

import Criterion.Measurement (time, time_, secs)

main :: IO ()
main = do
  (host, port, conc, reqs) <- fmap parse getArgs
  putStrLn $ "Connecting to " ++ host ++ " " ++ port
  (servAddr:_) <- getAddrInfo Nothing (Just host) (Just port)
  (diff, results) <- time $ process servAddr conc reqs
  let (errs, succs) = partitionEithers results
      numSuccs = length succs
      numErrs = length errs
      succTime = sum succs
      succAvg = succTime / fromIntegral numSuccs
  putStrLn $ unwords
    [show numSuccs, "successes,", show numErrs, "errors in", secs diff]
  when (numSuccs > 0) $ do
    putStrLn $ "min/max/avg request time: " ++
      intercalate " / " (map secs [minimum succs, maximum succs, succAvg])
    putStrLn $ show (round (fromIntegral reqs / diff) :: Int) ++ " r/s"

parse :: [String] -> (String, String, Int, Int)
parse [h,p,conc,reqs] = (h, p, read conc, read reqs)
parse _ = error "usage client host port concurrency requests"

process :: AddrInfo -> Int -> Int -> IO [Either IOException Double]
process servAddr conc reqs = do
  reqChan <- newChan
  ackChan <- newChan
  let processThread = forever $ do
        _ <- readChan reqChan
        handle (return . Left) (fmap Right socketAction) >>= writeChan ackChan
      socketAction = time_ $ do
        sock <- socket (addrFamily servAddr) Stream defaultProtocol
        connect sock (addrAddress servAddr)
        sendAll sock "GET /\r\n\r\n"
        void $ recv sock 1024
        close sock
  replicateM_ reqs (writeChan reqChan ())
  replicateM_ (min conc reqs) (forkIO processThread)
  replicateM reqs (readChan ackChan)



On Tue, Jan 7, 2014 at 10:55 AM, Branimir Maksimovic <branimir.maksimovic@gmail.com> wrote:
I have test network client, something like apache bench tool.
It uses mvars to synchronize and everything is ok when
compiled without -threaded.
real    0m2.995s
user    0m0.601s
sys    0m2.391s

With -threaded compile option result is following:
real    0m18.196s
user    0m2.054s
sys    0m3.313s

Seems that program is sleeping most of the time for some
reason. I can't explain behavior as it seems that
program is ok. It starts `concurrency` threads which
wait on mvar to process next task.

Program follows:

{-# Language OverloadedStrings #-}
import System.CPUTime
import System.IO
--import System.IO.Error
import Network.Socket hiding(recv)
import Network.Socket.ByteString
import System.Environment
import Control.Concurrent
import Control.Exception

main = do
    n <- getArgs
    let (host,port,conc,reqs) = parse n
    putStrLn $ "Connecting to " ++ host ++ " " ++ port
    s <- getAddrInfo Nothing (Just host) (Just port)
    let servAddr = head s
    begin <- getCPUTime
    process servAddr conc reqs
    end <- getCPUTime
    let diff = (fromIntegral (end - begin))/(10^12) :: Double
    putStrLn $ show (round (fromIntegral reqs / diff)) ++ " r/s"

parse [h,p,conc,reqs] = (h,p,read conc::Int,read reqs::Int)
parse _ = error "usage client host port concurrency requests"

process servAddr conc reqs = do
    let niter = if reqs >= conc then conc else reqs
    putStrLn $ "loop " ++ show niter
    mvars <- initThreads niter []
    putStrLn $ "Initialized " ++ show niter
    let loop n (m:mvs) f | n>0  = do
            flag <- isEmptyMVar m
            if f > length mvars then putStrLn "busy" else return ()
            if flag || f > length mvars
                then do
                    putMVar m ()
                    loop (n-1) mvs 0
                else loop n mvs (f+1)
            | otherwise = return ()
        loop n [] f = if n>0 then loop n mvars f else return ()
    putStrLn $ "length " ++ show (length mvars)
    loop (reqs-niter) mvars 0
    where
        initThreads niter vars | niter > 0 =  do
            mvar <- newMVar ()
            forkIO $ process mvar
            initThreads (niter-1) (mvar:vars)
            | otherwise = return vars
        process mvar = do
            sock <- socket (addrFamily servAddr) Stream defaultProtocol
            connect sock (addrAddress servAddr)
            sendAll sock "Hello World!\n"
            buf <- recv sock 1024
            close sock
            takeMVar mvar
            process mvar

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe