
Hi,
I'm trying to make an observable (code below) that listens to two ZeroMQ sockets and publishes the received messages on a third. Every message that is received gets an incremented sequence number before it is send on the publishing socket. To keep the current sequence number a state object is passed to the polling function and after handling the message, the polling function is called again with the updated state.
Unfortunately this recursive calling of the polling function creates a space leak. Any suggestions how to fix this?
Note: for brevity I left the incrementing of the sequence number and the sending on the publishing socket out of the code.
As David Feuer pointed out in a private mail, I didn't include all necessary information: - OS: Windows 7 (64-bit) - GHC: 7.8.4 (64-bit) (minghc) - Zeromq4-haskell: 0.6.3 (Stackage LTS 2.4) - ZeroMQ DLL: 4.0.4 (64-bit) The memory usage increases with every message that is received and is probably due to the recursive pollSockets call, since when I remove the recursive pollSockets calls in the observerHandleEvts and snapshotHandleEvts functions and use forever in pollSockets the memory usage stays constant. Kind regards, Martijn Rijkeboer
--- Code ---
module Observable ( run ) where
import Control.Monad (void) import Data.Int (Int64) import System.ZMQ4
data State = State { nextSeqNum :: !Int64 , listenSocket :: !(Socket Pull) , publishSocket :: !(Socket Pub) , snapSocket :: !(Socket Router) }
run :: IO () run = do withContext $ \ctx -> withSocket ctx Pull $ \observer -> withSocket ctx Pub $ \publisher -> withSocket ctx Router $ \snapshot -> do setLinger (restrict (0::Int)) observer bind observer "tcp://*:7010"
setLinger (restrict (0::Int)) publisher bind publisher "tcp://*:7011"
setLinger (restrict (0::Int)) snapshot setSendHighWM (restrict (0::Int)) snapshot bind snapshot "tcp://*:7013"
let state = State { nextSeqNum = 0 , listenSocket = observer , publishSocket = publisher , snapSocket = snapshot }
pollSockets state
pollSockets :: State -> IO () pollSockets state = void $ poll (-1) [ Sock (listenSocket state) [In] (Just $ observerHandleEvts state) , Sock (snapSocket state) [In] (Just $ snapshotHandleEvts state) ]
observerHandleEvts :: State -> [Event] -> IO () observerHandleEvts state _ = do void $ receiveMulti $ listenSocket state pollSockets state
snapshotHandleEvts :: State -> [Event] -> IO () snapshotHandleEvts state _ = do void $ receiveMulti $ snapSocket state pollSockets state