STM, Concurrent Haskell and network clients (long, code)

Folks, After two months with Haskell I think I'm starting to get the proper spirit. Inspired by Simon PJ's recent comments about lots of threads not hurting Haskell I introduced STM into my network client. I would like to run my architecture by you to see if there are any improvements to be made. I would like the code to be as bullet-proof as possible. I'm also jumping through some hoops to make sure none of the 3 threads (main, socket reader and socket writer) block indefinitely. I'm getting a "thread blocked indefinitely" exception now and I think this is because the main thread is not getting notified when the socket threads die. My network client has two threads. #1 reads network packets, converts them to events and posts them to a TChan using a function passed to it by the main thread. #2 blocks on a second TChan and waits for events to be sent out through the socket. The main thread creates a TChan to read events from, passes a closure that writes to the TChan to the network client and uses a network client function to send events to the network client. All in all it works out very nicely as there's no networking, FFI, etc. in the network client logic. This is an improvement from what I had before. The function that starts the poker client has this signature: startPokerClient :: (Event -> IO ()) -- fun to send events to the main thread -> HostInfo -- (host, port) -> AuthInfo -- (user, pass, etc.) -> IO PokerClient where data PokerClient = PC !MaybeThread !MaybeThread !(TChan Command) The main network client thread calls this fun to deliver events to the network client: writePokerClient :: PokerClient -> Command -> IO () writePokerClient (PC _ _ chan) cmd = atomically $ writeTChan chan cmd Main thread can stop the network client using stopPokerClient :: PokerClient -> IO () stopPokerClient (PC tmv2 tmv1 _) = do putStrLn $ "stopPokerClient invoked" maybeKillThread tmv2 maybeKillThread tmv1 The abbreviated startPokerClient looks like this.. {-# NOINLINE lock #-} lock :: MVar () lock = unsafePerformIO $ newMVar () startPokerClient post (host, port) auth = do h <- withMVar lock -- getHostByName is not thread-safe $ \_ -> connectTo host (PortNumber $ fromIntegral port) `catchError` (\e -> fail $ "Failed to connect to server: " ++ show e) -- kick off the SSL handshake ssl <- startSSL doSSLHandshake h ssl ... -- we need one thread to always kill the other (tmv1 :: MaybeThread) <- atomically $ newEmptyTMVar (tmv2 :: MaybeThread) <- atomically $ newEmptyTMVar (tmvssl :: MaybeSSL) <- atomically $ newTMVar (Just ssl) -- thread to read from the socket and post commands tid1 <- forkIO $ writeLoop post h ssl `finally` finalize tmv1 tmv2 h tmvssl post -- thread to write to grab commands generated -- by the main thread and write them to the socket chan <- atomically $ newTChan tid2 <- forkIO $ readLoop h chan `finally` finalize tmv2 tmv1 h tmvssl post -- we have both threads now atomically $! do putTMVar tmv1 (Just tid1) putTMVar tmv2 (Just tid2) return $! PC tmv2 tmv1 chan Interestingly enough, I had to use $! otherwise startPokerClient was sitting there for a while, due to due to lazy evaluation I assume. I'm using the following finalizer for my socket reader/writer threads to make sure that if one thread dies the other one is taken with it and that the main thread is notified. I don't have a Maybe around the socket itself since I don't think closing a socket twice has any bad side effects. I put it in a try, though, for good measure. finalize :: MaybeThread -> MaybeThread -> Handle -> MaybeSSL -> (Event -> IO ()) -> IO () finalize me buddy h ssl post = do -- make sure we are not killed again -- killing a dead thread is apparently -- not free of side effects in GHC 6.4.1 atomically $ putTMVar me Nothing -- kill the other thread maybeKillThread buddy maybeFreeSSL ssl try $ hClose h post $ StopScript -- notify main thread return () The rest of the relevant code is below. Please let me know if it can be simplified or made more robust. maybeFreeSSL :: MaybeSSL -> IO () maybeFreeSSL tmv = do putStrLn $ "maybeFreeSSL invoked" mssl <- atomically $ swapTMVar tmv Nothing case mssl of Nothing -> return () Just (ssl, _, _) -> do sslFree ssl sslFree ssl maybeKillThread :: MaybeThread -> IO () maybeKillThread tmv = do putStrLn $ "maybeKillThread invoked " mtid <- atomically $ swapTMVar tmv Nothing case mtid of Nothing -> return () Just tid -> killThread tid --- Socket reader writeLoop :: (Event -> IO ()) -> Handle -> (SSL, BIO, BIO) -> IO () writeLoop post h ssl = do cmd <- read h ssl post $ Cmd cmd writeLoop post h ssl --- Socket writer readLoop :: Handle -> TChan Command -> IO () readLoop h chan = do cmd <- atomically $ readTChan chan write h cmd readLoop h chan --- SSL, etc. not used now but will be used to decrypt later read :: Handle -> (SSL, BIO, BIO) -> IO Command read h _ = do bytes <- hGet h 4 let (size', _) = unpickle endian32 bytes size = fromIntegral $ size' - 4 if size > 0 then do packet <- hGet h size when (size /= length packet) $ fail "Packet size mismatch" unstuff packet else do fail $ "read: size is 0: bytes: " ++ show bytes unstuff :: [Word8] -> IO Command unstuff packet = do let (kind, packet') = unpickle puCmdType packet (cmd', _) = unpickle (puCommand kind) packet' case cmd' of InvalidCommand -> do fail $ "unstuff: Cannot parse: " ++ show packet SrvCompressedCommands sz data' -> do data'' <- uncompress data' $ fromIntegral sz unstuff $ drop 4 data'' _ -> return $! cmd' write :: Handle -> Command -> IO () write h cmd = do let kind = cmdType cmd bytes' = pickle puCmdType kind bytes = bytes' ++ pickle (puCommand kind) cmd when (length bytes <= 0) $ fail $ "write: Could not pack: " ++ show cmd let size = 4 + length bytes size' = pickle endian32 $ fromIntegral size when (length size' <= 0) $ fail $ "write: Could not pack: " ++ show size hPut h (size' ++ bytes) hFlush h -- http://wagerlabs.com/

On Fri, Dec 02, 2005 at 03:20:20PM +0000, Joel Reymont wrote:
The rest of the relevant code is below. Please let me know if it can be simplified or made more robust.
maybeFreeSSL :: MaybeSSL -> IO () maybeFreeSSL tmv = do putStrLn $ "maybeFreeSSL invoked" mssl <- atomically $ swapTMVar tmv Nothing case mssl of Nothing -> return () Just (ssl, _, _) -> do sslFree ssl sslFree ssl
It should suffice to sslFree ssl once ;-) I think you forgot to remove it after testing. Best regards Tomasz -- I am searching for a programmer who is good at least in some of [Haskell, ML, C++, Linux, FreeBSD, math] for work in Warsaw, Poland

Well, that was just to see if anybody is paying attention ;-). On Dec 4, 2005, at 7:08 PM, Tomasz Zielonka wrote:
It should suffice to sslFree ssl once ;-) I think you forgot to remove it after testing.

On Sun, Dec 04, 2005 at 07:30:50PM +0000, Joel Reymont wrote:
Well, that was just to see if anybody is paying attention ;-).
On Dec 4, 2005, at 7:08 PM, Tomasz Zielonka wrote:
It should suffice to sslFree ssl once ;-) I think you forgot to remove it after testing.
Unfortunately, the amount of code you posted was too much for me to digest with the level of interruption I got from my family and cat ;-) I also have to stop thinking about STM for a while, because I caught myself wanting to "retry" in the real world ;-) Best regards Tomasz -- I am searching for a programmer who is good at least in some of [Haskell, ML, C++, Linux, FreeBSD, math] for work in Warsaw, Poland
participants (2)
-
Joel Reymont
-
Tomasz Zielonka