
Hello, all! Can somebody please explain, what is the best way of using CURL with several threads? I wrote simple client, which tries to authenticate against HTTP server. With running this client, it starts to eat memory insanely (and I know this code is far, far away of even being close to be called good one ) ========================================================================================= module NTLMTest where import System.IO import Network.Curl import Control.Concurrent import Control.Concurrent.Chan type ResponseState = Either Bool String type RespChannel = Chan ResponseState delay = 500 * 1000 isResponseOk :: String -> CurlResponse -> ResponseState isResponseOk username response = case respCurlCode response of CurlOK -> Left True _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response) checkAuthResponse :: RespChannel -> String -> String -> String -> IO () checkAuthResponse state user passwd url = do response <- curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd] writeChan state $ isResponseOk user response threadDelay $ delay runHTTPThread :: RespChannel -> (String,String) -> IO () runHTTPThread state (user,passwd) = checkAuthResponse state user passwd url url = "http://localhost:8082/" credentials = map (\i -> ("user" ++ show i,"123456")) [1..21] main = withCurlDo $ do chan <- newChan :: IO (RespChannel) mapM_ ( \cred -> forkIO $ runHTTPThread chan cred ) credentials dumpChannel chan $ length credentials main where dumpChannel :: RespChannel -> Int -> IO () dumpChannel _chan n | n == 0 = return () | otherwise = do state <- readChan _chan case state of (Left _) -> return () --putStrLn "OK" (Right err) -> putStrLn err dumpChannel _chan $ n-1 ========================================================================================= If I get rid of forkIO - it stops at 40-50 megabytes and don't raise memory usage anymore. Also, I noticed that (either because of buffering, or may be something else) results are appearing on console much slower than if I simply use "wget" with looping in shell script. JMeter also reports awesome speed, so server can authenticate tens of concurrent users per second (thus it's not server or connection bandwidth issue). Hopefully, someone could help me in overcoming my ignorance :) -- Eugene N Dzhurinsky

On Wed, Feb 17, 2010 at 07:34:07PM +0200, Eugene Dzhurinsky wrote:
Hopefully, someone could help me in overcoming my ignorance :)
I realized that I can share the same Chan instance over all invocations in main, and wrap internal function into withCurlDo to ensure only one IO action gets executed with this library. Finally I've come with the following code, which however still has some memory leaks. May be someone will get an idea what's wrong below? ============================================================================================= module NTLMTest where import System.IO import Network.Curl import Control.Concurrent import Control.Concurrent.Chan type ResponseState = Either Bool String type RespChannel = Chan ResponseState delay = 500 * 1000 isResponseOk :: String -> CurlResponse -> ResponseState isResponseOk username response = case respCurlCode response of CurlOK -> Left True _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response) checkAuthResponse :: RespChannel -> String -> String -> String -> IO () checkAuthResponse state user passwd url = do response <- curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd] writeChan state $ isResponseOk user response threadDelay $ delay runHTTPThread :: RespChannel -> (String,String) -> IO () runHTTPThread state (user,passwd) = checkAuthResponse state user passwd url url = "http://localhost:8082/" credentials = map (\i -> ("user" ++ show i,"123456")) [1..21] main = do chan <- newChan :: IO (RespChannel) withCurlDo $ invokeThreads chan where invokeThreads chan = do mapM_ ( \cred -> forkIO $ runHTTPThread chan cred ) credentials dumpChannel chan $ length credentials invokeThreads chan dumpChannel :: RespChannel -> Int -> IO () dumpChannel _chan n | n == 0 = return () | otherwise = do state <- readChan _chan case state of (Left _) -> return () --putStrLn "OK" (Right err) -> putStrLn err dumpChannel _chan $ n-1 ============================================================================================= Thank you in advance! -- Eugene Dzhurinsky

Hi, Your code forks off N threads to do HTTP response checking, then waits for the reply (invokeThreads). Each thread (runHTTPThread) calls curlGetResponse and *immediately* sends the answer back down the channel to invokeThreads (checkAuthResponse) -- then waits for half a second before terminating. As soon as the original process (invokeThreads) has all N responses, it forks off N threads again. So if your code manages to process the N requests such that it can do them all in, say, 0.05 seconds, you'll have about ten times as many threads in your system as you intended (because they all hang around for 0.5 seconds after completing their work). I suspect what you intended to do was put that threadDelay call *before* sending back the response, which would prevent this leaking of threads. Some quick style suggestions: your recursion pattern in dumpChannel is easily replaced with replicateM, and your infinite recursion in invokeThreads could easily become the function "forever". Never recurse directly if a combinator can remove the need :-) Your code could easily be accomplished in CHP (http://hackage.haskell.org/package/chp). runParMapM would solve your exact problem easily; you could replace your code with: ==== module NTLMTest where import Control.Monad.Trans (liftIO) import Control.Applicative ((<$>)) import System.IO import Network.Curl import Control.Concurrent.CHP type ResponseState = Either Bool String isResponseOk :: String -> CurlResponse -> ResponseState isResponseOk username response = case respCurlCode response of CurlOK -> Left True _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response) -- Note: I re-ordered the parameters to this function checkAuthResponse :: String -> String -> String -> IO ResponseState checkAuthResponse url user passwd = isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd] url = "http://localhost:8082/" credentials = map (\i -> ("user" ++ show i,"123456")) [1..21] main = runCHP_ $ runParMapM (liftIO . uncurry (checkAuthResponse url)) credentials >>= mapM (liftIO . either (const $ return ()) putStrLn) ==== That above version will get all the responses in parallel and print them out once they are all done, and is quite short. This isn't what your original code did though -- that read the responses from a channel and printed them as they arrived. The below version is probably the closest CHP version to your original code: ==== module NTLMTest where import Control.Monad (replicateM_, (<=<)) import Control.Monad.Trans (liftIO) import Control.Applicative ((<$>)) import System.IO import Network.Curl import Control.Concurrent.CHP type ResponseState = Either Bool String isResponseOk :: String -> CurlResponse -> ResponseState isResponseOk username response = case respCurlCode response of CurlOK -> Left True _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response) -- Note: I re-ordered the parameters to this function checkAuthResponse :: String -> String -> String -> IO ResponseState checkAuthResponse url user passwd = isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd] url = "http://localhost:8082/" credentials = map (\i -> ("user" ++ show i,"123456")) [1..21] main = runCHP_ $ do chan <- anyToOneChannel runParallel_ $ dumpChannel (reader chan) : map (claim (writer chan) . writeValue <=< liftIO . uncurry (checkAuthResponse url)) credentials where dumpChannel :: Chanin ResponseState -> CHP () dumpChannel c = replicateM_ (length credentials) (readChannel c >>= liftIO . either (const $ return ()) putStrLn) ==== This version runs the dumpChannel procedure in parallel with a thread for each credential that writes the result to a shared channel (claiming it as it does so). Neither of my versions checks the credentials repeatedly like yours does, but you can easily add that in. If you're not a point-free fan (I find it irresistible these days), I can break those solutions down a bit into more functions. Hope that helps, Neil. Eugeny N Dzhurinsky wrote:
On Wed, Feb 17, 2010 at 07:34:07PM +0200, Eugene Dzhurinsky wrote:
Hopefully, someone could help me in overcoming my ignorance :)
I realized that I can share the same Chan instance over all invocations in main, and wrap internal function into withCurlDo to ensure only one IO action gets executed with this library. Finally I've come with the following code, which however still has some memory leaks. May be someone will get an idea what's wrong below?
=============================================================================================
module NTLMTest where
import System.IO import Network.Curl import Control.Concurrent import Control.Concurrent.Chan
type ResponseState = Either Bool String
type RespChannel = Chan ResponseState
delay = 500 * 1000
isResponseOk :: String -> CurlResponse -> ResponseState isResponseOk username response = case respCurlCode response of CurlOK -> Left True _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response)
checkAuthResponse :: RespChannel -> String -> String -> String -> IO () checkAuthResponse state user passwd url = do response <- curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd] writeChan state $ isResponseOk user response threadDelay $ delay
runHTTPThread :: RespChannel -> (String,String) -> IO () runHTTPThread state (user,passwd) = checkAuthResponse state user passwd url
url = "http://localhost:8082/" credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]
main = do chan <- newChan :: IO (RespChannel) withCurlDo $ invokeThreads chan where invokeThreads chan = do mapM_ ( \cred -> forkIO $ runHTTPThread chan cred ) credentials dumpChannel chan $ length credentials invokeThreads chan dumpChannel :: RespChannel -> Int -> IO () dumpChannel _chan n | n == 0 = return () | otherwise = do state <- readChan _chan case state of (Left _) -> return () --putStrLn "OK" (Right err) -> putStrLn err dumpChannel _chan $ n-1
=============================================================================================
Thank you in advance!
------------------------------------------------------------------------
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
participants (3)
-
Eugene Dzhurinsky
-
Eugeny N Dzhurinsky
-
Neil Brown