
I'm not happy with asynchronous I/O in Haskell. It's hard to reason about, and doesn't compose well. At least in my code. I'm currently trying to build a networking layer for my application using Network.TLS. Here is a rather minimalist API: newtype Connection = Connection (TLSCtx Handle) connectClient :: Handle -- ^ Connection handle, as returned by 'connectTo' -> X509 -- ^ TLS certificate (i.e. public key) -> IO Connection connectServer :: Handle -- ^ Connection handle, as returned by 'accept' -> X509 -- ^ TLS certificate (i.e. public key) -> TLS.PrivateKey -- ^ TLS private key -> IO Connection close :: Connection -> IO () sendMessage :: Connection -> Message -> IO () recvMessage :: Connection -> ByteString -> IO (Message, ByteString) The module provides little more than connection initialization and message serialization. I don't try to use locks or STM to multiplex the connection or, in the case of recvMessage, hide connection state. I just be sure to only use sendMessage in one thread at a time, only use recvMessage in one thread at a time, and marshal the "extra bytes" parameter of recvMessage from call to call (with the help of StateT). I wrote a simple "chat server" to test it. The client turned out okay: main :: IO () main = do cert <- getCertificate handle <- connectTo "localhost" (PortNumber 1337) conn <- connectClient handle cert _ <- forkIO $ forever $ do s <- getLine sendMessage conn $ TestMessage s forever $ flip runStateT B.empty $ do msg <- StateT $ recvMessage conn case msg of TestMessage s -> liftIO $ putStrLn s _ -> liftIO $ hPrintf stderr "Warning: unrecognized message from server: %s\n" (messageTypeName msg) The only glaring problem is that, if the user presses Ctrl+D, the forked (sending) thread dies, but the main (receiving) thread lingers. I'd have to add exception handlers to ensure that when one thread dies, the other thread dies too. However, the server is an abomination (see attachment). Unfortunately, it's not as simple as "spawn one thread per client". We need at least two threads, one to listen for messages from the client, and another to send messages to the client. GHC won't let us simultaneously, in the same thread, wait for input from a connection and wait for an STM transaction to succeed. Another source of complexity is: what if we throw an exception at a thread while it is in the middle of sending a packet? Then we can't shut down the connection properly (i.e. Network.TLS.bye), because the receiver might think the close_notify packet is part of the interrupted packet. Having a thread for each client is good, as it: * Lets us think about each client separately. No need to turn our code inside out or write one big loop that juggles all the clients. * Isolates exceptions. If sendMessage or recvMessage throws an exception, it doesn't bring the whole server down. On the other hand, having multiple threads interact with a single client is hard to think about: * We have to synchronize the threads (e.g. when one dies, kill the other one) * Multiple places where an exception can arise * Can multiple threads interact with the connection handle simultaneously? So why don't I make my connection API handle some of this? Well, I tried. There are so many ways to do it, and I couldn't find a way that simplified usage much. The approach used by Handle and by Network.TLS is to use MVars and IORefs to ensure that, if two threads access the same connection, the connection doesn't become totally corrupt. If I do the same, then I'll have *three* layers of locking under the hood. Worse, the locking done by Handle and Network.TLS doesn't guarantee much. I don't know if it's safe to have one thread sending and another thread receiving. Especially in the case of Network.TLS, where 'recvData' automatically handshakes in some cases, which sends packets. Since I don't know how much thread safety to expect, I can't write networking code and know for sure that it is safe. I'm certainly not protected from interleaved data if multiple threads send on the same handle. For example: import Control.Concurrent import System.IO main :: IO () main = do hSetBuffering stdout NoBuffering _ <- forkIO $ putStrLn "One sentence." putStrLn "Another sentence." produces: AnOonteh esre nsteenntceen.c e. That is, I can't rely on putStrLn being "atomic". To produce intelligible output (without changing the buffering mode), I have to "lock" the output each time I write something. putStrLn doesn't do it for me. === Summary === In Haskell, sound logic and a great type system lead to elegant, composable code in a variety of domains, such as: * Expression evaluation * Parsing * Concurrent programming (thanks to STM) Asynchronous I/O is tricky. However, Haskell currently does little to alleviate the complexity (at least for me). How can we structure network protocol APIs so that they stack well (e.g. only lock once, rather than locking each layer's connection state)? How can we deal with I/O errors without having to think about them at every turn? For now, how can I structure my application's communication API so it's less messy to use? Thanks, - Joey

On 01/14/2012 06:24 AM, Joey Adams wrote:
I'm not happy with asynchronous I/O in Haskell. It's hard to reason about, and doesn't compose well. At least in my code.
[--snip--] Async I/O *is* tricky if you're expecting threads to do their own writes/reads directly to/from sockets. I find that using a message-passing approach for communication makes this much easier. If you need multiple server threads to respond to the same client (socket) then the easiest approach might be to simply use a (Chan a) for output. Since you always put full messages to the Chan, exceptions cause no problems with respect to partial messages, etc. You can also use a Chan for forwarding messages from the client socket to the appropriate server threads -- if you need several (or even all) threads to receive messages from the client you can use "dupChan" on the "input-from-client" channel you pass to the server threads. So, the API becomes something like: runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO () where the first parameter contains the "client logic" and "A" is the type of the messages from the client and "B" is the type of the messages which are sent back to the client. Hope this helps,

On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson
So, the API becomes something like:
runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
where the first parameter contains the "client logic" and "A" is the type of the messages from the client and "B" is the type of the messages which are sent back to the client.
Thanks, that's a good idea. Even if I only plan to receive in one thread, placing the messages in a Chan or TChan helps separate my application thread from the complexities of connection management. Is there something on Hackage that will do this for me? Or will I need to roll my own? Namely, convert a network connection to a pair of channels, and close the connection automatically. Something like this: -- | Spawn two threads, one which populates the first channel with messages -- from the other host, and another which reads the second channel and sends -- its messages to the other host. -- -- Run the given computation, passing it these channels. When the computation -- completes (or throws an exception), sending and receiving will stop, and the -- connection will be closed. -- -- If either the receiving thread or sending thread encounter an exception, -- sending and receiving will stop, and an asynchronous exception will be -- thrown to your thread. channelize :: IO msg_in -- ^ Receive callback -> (msg_out -> IO () -- ^ Send callback -> IO () -- ^ Close callback -> (TChan msg_in -> TChan msg_out -> IO a) -- ^ Inner computation -> IO a

I've been trying to write networking code in Haskell too. I've also
come to the conclusion that channels are the way to go. However,
what's missing in the standard `Chan` type, which is essential for my
use-case, is the ability to do the equivalent of the unix select call.
My other slight qualm is that the type doesn't express the direction
of data (though this is easy to add afterwards).
I know of the chp package, but in order to learn how it worked, I
spent a day writing my own version. I've kept the API similar to that
of the standard Chan's. If it would be useful to you as well, I'll
happily open source it sooner rather than later,
Daniel
p.s I'd avoid the TChan for networking code as reading from a TChan is
a busy operation. [1]
[1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control...
On 14 January 2012 10:42, Joey Adams
On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson
wrote: So, the API becomes something like:
runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
where the first parameter contains the "client logic" and "A" is the type of the messages from the client and "B" is the type of the messages which are sent back to the client.
Thanks, that's a good idea. Even if I only plan to receive in one thread, placing the messages in a Chan or TChan helps separate my application thread from the complexities of connection management.
Is there something on Hackage that will do this for me? Or will I need to roll my own? Namely, convert a network connection to a pair of channels, and close the connection automatically. Something like this:
-- | Spawn two threads, one which populates the first channel with messages -- from the other host, and another which reads the second channel and sends -- its messages to the other host. -- -- Run the given computation, passing it these channels. When the computation -- completes (or throws an exception), sending and receiving will stop, and the -- connection will be closed. -- -- If either the receiving thread or sending thread encounter an exception, -- sending and receiving will stop, and an asynchronous exception will be -- thrown to your thread. channelize :: IO msg_in -- ^ Receive callback -> (msg_out -> IO () -- ^ Send callback -> IO () -- ^ Close callback -> (TChan msg_in -> TChan msg_out -> IO a) -- ^ Inner computation -> IO a
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Disregard that last comment on `TChan`s; retry blocks. you learn a new
thing every day [=
Daniel
On 14 January 2012 11:27, Daniel Waterworth
I've been trying to write networking code in Haskell too. I've also come to the conclusion that channels are the way to go. However, what's missing in the standard `Chan` type, which is essential for my use-case, is the ability to do the equivalent of the unix select call. My other slight qualm is that the type doesn't express the direction of data (though this is easy to add afterwards).
I know of the chp package, but in order to learn how it worked, I spent a day writing my own version. I've kept the API similar to that of the standard Chan's. If it would be useful to you as well, I'll happily open source it sooner rather than later,
Daniel
p.s I'd avoid the TChan for networking code as reading from a TChan is a busy operation. [1]
[1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control...
On 14 January 2012 10:42, Joey Adams
wrote: On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson
wrote: So, the API becomes something like:
runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
where the first parameter contains the "client logic" and "A" is the type of the messages from the client and "B" is the type of the messages which are sent back to the client.
Thanks, that's a good idea. Even if I only plan to receive in one thread, placing the messages in a Chan or TChan helps separate my application thread from the complexities of connection management.
Is there something on Hackage that will do this for me? Or will I need to roll my own? Namely, convert a network connection to a pair of channels, and close the connection automatically. Something like this:
-- | Spawn two threads, one which populates the first channel with messages -- from the other host, and another which reads the second channel and sends -- its messages to the other host. -- -- Run the given computation, passing it these channels. When the computation -- completes (or throws an exception), sending and receiving will stop, and the -- connection will be closed. -- -- If either the receiving thread or sending thread encounter an exception, -- sending and receiving will stop, and an asynchronous exception will be -- thrown to your thread. channelize :: IO msg_in -- ^ Receive callback -> (msg_out -> IO () -- ^ Send callback -> IO () -- ^ Close callback -> (TChan msg_in -> TChan msg_out -> IO a) -- ^ Inner computation -> IO a
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

Hi Daniel,
I've been trying to write networking code in Haskell too. I've also come to the conclusion that channels are the way to go.
isn't a tuple of input/output channels essentially the same as a stream processor arrow? I found the example discussed in the "arrow paper" [1] very enlightening in that regard. There also is a Haskell module that extends the SP type to support monadic IO at [2]. Take care, Peter [1] http://www.ittc.ku.edu/Projects/SLDG/filing_cabinet/Hughes_Generalizing_Mona... [2] http://hackage.haskell.org/package/streamproc

Hi Peter,
streamproc is a very interesting package, I'll surely use it somewhere
in the future. However, I'm not convinced that this solves my
immediate problem, but perhaps this is due to my inexperience with
arrows. My problem is:
I have a number of network connections and I have a system that does
things. I want the network connections to interact with the system. I
also want the system to be able to interact with the network
connections by way of a pub/sub style message bus.
The only way I can see stream processors working in this scenario is
if all of the events of the system are handled in a single thread. The
events are then pushed into the stream processor and actions are
pulled out. This isn't acceptable because the amount of logic in the
stream processor will be fairly small for my problem in comparison
with the logic that is required to mux/demux events/actions onto
sockets. It's also a problem that there's a single threaded
bottleneck.
Daniel
On 14 January 2012 11:58, Peter Simons
Hi Daniel,
> I've been trying to write networking code in Haskell too. I've also > come to the conclusion that channels are the way to go.
isn't a tuple of input/output channels essentially the same as a stream processor arrow? I found the example discussed in the "arrow paper" [1] very enlightening in that regard. There also is a Haskell module that extends the SP type to support monadic IO at [2].
Take care, Peter
[1] http://www.ittc.ku.edu/Projects/SLDG/filing_cabinet/Hughes_Generalizing_Mona... [2] http://hackage.haskell.org/package/streamproc
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

On 1/14/12 6:27 AM, Daniel Waterworth wrote:
p.s I'd avoid the TChan for networking code as reading from a TChan is a busy operation. [1]
[1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control...
The `retry`-ness will be rectified whenever the new version of stm is pushed out[1], which includes tryReadTChan for one-shot use. Until then, you can use the version of tryReadTChan in stm-chans[2] which provides the same operation, though less optimized since it's not behind the API wall. Once I learn the version number of when the optimized variants will be released, the stm-chans version will use CPP to properly select between the new version vs the backport, so you can rely on stm-chans to provide a compatibility layer for those operations. [1] http://www.haskell.org/pipermail/cvs-libraries/2011-April/012914.html [2] http://hackage.haskell.org/packages/archive/stm-chans/1.1.0/doc/html/src/Con... -- Live well, ~wren

On 01/14/2012 11:42 AM, Joey Adams wrote:
On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson
wrote: So, the API becomes something like:
runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
where the first parameter contains the "client logic" and "A" is the type of the messages from the client and "B" is the type of the messages which are sent back to the client.
Thanks, that's a good idea. Even if I only plan to receive in one thread, placing the messages in a Chan or TChan helps separate my application thread from the complexities of connection management.
Unless TCP is an absolute requirement, something like 0MQ[1,2] may be worth investigating. It handles all the nasty details and you get a simple message-based interface with lots of nice things like pub-sub, request-reply, etc. etc. [1] http://hackage.haskell.org/package/zeromq-haskell-0.8.2 [2] http://www.zeromq.org/

Hi guys,
I'm not happy with asynchronous I/O in Haskell. It's hard to reason about, and doesn't compose well.
Async I/O *is* tricky if you're expecting threads to do their own writes/reads directly to/from sockets. I find that using a message-passing approach for communication makes this much easier.
yes, that is true. I've always felt that spreading IO code all over the software is a choice that makes the programmers live unnecessarily hard. The (IMHO superior) alternative is to have one central IO loop that generates buffers of input, passes them to callback a function, and receives buffers of output in response. I have attached a short module that implements the following function: type ByteCount = Word16 type Capacity = Word16 data Buffer = Buf !Capacity !(Ptr Word8) !ByteCount type BlockHandler st = Buffer -> st -> IO (Buffer, st) runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st That setup is ideal for implementing streaming services, where there is only one connection on which some kind of dialog between client/server takes place, i.e. an HTTP server. Programs like Bittorrent, on the other hand, are much harder to design, because there's a great number of seemingly individual I/O contexts (i.e. the machine is talking to hundreds, or even thousands of other machines), but all those communications need to be coordinated in one way or another. A solution for that problem invariably ends up looking like a massive finite state machine, which is somewhat unpleasant. Take care, Peter {-# LANGUAGE DeriveDataTypeable #-} {- | Module : BlockIO License : BSD3 Maintainer : simons@cryp.to Stability : provisional Portability : DeriveDataTypeable 'runLoop' drives a 'BlockHandler' with data read from the input stream until 'hIsEOF' ensues. Everything else has to be done by the callback; runLoop just does the I\/O. But it does it /fast/. -} module BlockIO where import Prelude hiding ( catch, rem ) import Control.Exception import Control.Monad.State import Data.List import Data.Typeable import System.IO import System.IO.Error hiding ( catch ) import Foreign hiding ( new ) import System.Timeout -- * Static Buffer I\/O type ReadHandle = Handle type WriteHandle = Handle type ByteCount = Word16 type Capacity = Word16 data Buffer = Buf !Capacity !(Ptr Word8) !ByteCount deriving (Eq, Show, Typeable) -- |Run the given computation with an initialized, empty -- 'Buffer'. The buffer is gone when the computation -- returns. withBuffer :: Capacity -> (Buffer -> IO a) -> IO a withBuffer 0 = fail "BlockIO.withBuffer with size 0 doesn't make sense" withBuffer n = bracket cons dest where cons = mallocArray (fromIntegral n) >>= \p -> return (Buf n p 0) dest (Buf _ p _) = free p -- |Drop the first @n <= size@ octets from the buffer. flush :: ByteCount -> Buffer -> IO Buffer flush 0 buf = return buf flush n (Buf cap ptr len) = assert (n <= len) $ do let ptr' = ptr `plusPtr` fromIntegral n len' = fromIntegral len - fromIntegral n when (len' > 0) (copyArray ptr ptr' len') return (Buf cap ptr (fromIntegral len')) type Timeout = Int -- |If there is space, read and append more octets; then -- return the modified buffer. In case of 'hIsEOF', -- 'Nothing' is returned. If the buffer is full already, -- 'throwDyn' a 'BufferOverflow' exception. When the timeout -- exceeds, 'ReadTimeout' is thrown. slurp :: Timeout -> ReadHandle -> Buffer -> IO (Maybe Buffer) slurp to h b@(Buf cap ptr len) = do when (cap <= len) (throw (BufferOverflow h b)) timeout to (handleEOF wrap) >>= maybe (throw (ReadTimeout to h b)) return where wrap = do let ptr' = ptr `plusPtr` fromIntegral len n = cap - len rc <- hGetBufNonBlocking h ptr' (fromIntegral n) if rc > 0 then return (Buf cap ptr (len + fromIntegral rc)) else hWaitForInput h (-1) >> wrap -- * BlockHandler and I\/O Driver -- |A callback function suitable for use with 'runLoop' -- takes a buffer and a state, then returns a modified -- buffer and a modified state. Usually the callback will -- use 'slurp' to remove data it has processed already. type BlockHandler st = Buffer -> st -> IO (Buffer, st) type ExceptionHandler st e = e -> st -> IO st -- |Our main I\/O driver. runLoopNB :: (st -> Timeout) -- ^ user state provides timeout -> (SomeException -> st -> IO st) -- ^ user provides I\/O error handler -> ReadHandle -- ^ the input source -> Capacity -- ^ I\/O buffer size -> BlockHandler st -- ^ callback -> st -- ^ initial callback state -> IO st -- ^ return final callback state runLoopNB mkTO errH hIn cap f initST = withBuffer cap (`ioloop` initST) where ioloop buf st = buf `seq` st `seq` handle (`errH` st) $ do rc <- slurp (mkTO st) hIn buf case rc of Nothing -> return st Just buf' -> f buf' st >>= uncurry ioloop -- |A variant which won't time out and will just 'throw' all -- exceptions. runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st runLoop = runLoopNB (const (-1)) (\e _ -> throw e) -- * Handler Combinators -- |Signal how many bytes have been consumed from the -- /front/ of the list; these octets will be dropped. type StreamHandler st = [Word8] -> st -> IO (ByteCount, st) handleStream :: StreamHandler st -> BlockHandler st handleStream f buf@(Buf _ ptr len) st = do (i, st') <- peekArray (fromIntegral len) ptr >>= flip f st buf' <- flush i buf return (buf', st') -- * I\/O Exceptions -- |Thrown by 'slurp'. data BufferOverflow = BufferOverflow ReadHandle Buffer deriving (Show, Typeable) instance Exception BufferOverflow where -- |Thrown by 'slurp'. data ReadTimeout = ReadTimeout Timeout ReadHandle Buffer deriving (Show, Typeable) instance Exception ReadTimeout where -- * Internal Helper Functions -- |Return 'Nothing' if the given computation throws an -- 'isEOFError' exception. Used by 'slurp'. handleEOF :: IO a -> IO (Maybe a) handleEOF f = catchJust fromException (fmap Just f) (\e -> if isEOFError e then return Nothing else ioError e) -- |Our version of C's @strstr(3)@. strstr :: [Word8] -> [Word8] -> Maybe Int strstr tok = strstr' 0 where strstr' _ [] = Nothing strstr' pos ls@(_:xs) | tok `isPrefixOf` ls = Just (pos + length tok) | otherwise = strstr' (pos + 1) xs -- |Split a list by some delimiter. Will soon be provided by -- "Data.List". splitList :: Eq a => [a] -> [a] -> [[a]] splitList d' l' = unfoldr (\x -> if null x then Nothing else Just $ nextToken d' [] (snd $ splitAt (length d') x)) (d'++l') where nextToken _ r [] = (r, []) nextToken d r l@(h:t) | d `isPrefixOf` l = (r, l) | otherwise = nextToken d (r++[h]) t

I favor a wait-free concurrency model based on the `vat` from E language.
Vats can be modeled very easily in Haskell, and in many other languages. I
currently use such a vat model for my Haskell projects. I describe aspects
of it at a few places:
* http://lambda-the-ultimate.org/node/4289#comment-65886 (vats and methods)
* http://lambda-the-ultimate.org/node/4325#comment-66645 (pipeline method
calls)
* http://awelonblue.wordpress.com/2011/10/06/vat-model-for-rdp/
Unfortunately, my code isn't generic. The vats I've implemented are
specialized (i.e. via extra stages and queues) primarily for efficient
processing of concurrent reactive dataflows.
Advantages of Vats:
* wait-free asynchronous IO, hence deadlock and starvation free
* first-class methods that help in many ways:
** distribute the dispatch burden (no need for a `main` switch statement)
** extensible (easy to add new methods without adjusting central code)
** transparent parallelization (calls may transparently invoke local or
remote methods)
** securable (control distribution and parameter-types of methods)
** easily model code distribution (e.g. create a method with monad action
parameters)
* simple state - variables local to each vat may be shared between methods
* coarse-grained `islands of consistency` are easy to reason about
* implicit batching between vats improves consistency AND efficiency
* easy to express incremental computation (as sequence of method calls)
* clean coarse-grained interaction with data-parallelism (e.g. spark
parameters)
Technically, the vat consistency model is not `composable`. Instead, it
works well based on coarse granularity and the natural limits of local
reasoning - i.e. actual use-cases only interact with one or two other vats
before they extend far enough that developers simply design assuming
arbitrary ordering and potential interference. Developers can build ad-hoc
consistency models atop vats easily enough. I use a temporal consistency
model atop vats, which is composable, but is feasible for my reactive
dataflow model primarily due to its updates being commutative.
That said, one should be careful about asserting transactions are
composable. Transactions don't scale well as they compose, having greater
opportunity for conflict, rework, starvation, priority inversion.
Transactions also don't interact well with the real-world IO, such as
continuous sensor streams or actuators. Despite the non-composability of
vat semantics, they at least scale better than transactions. cf.
http://awelonblue.wordpress.com/2011/07/05/transaction-tribulation/
The recent work on Cloud Haskell takes a similar inspiration from E's vats.
However, the focus of cloud haskell is different and consequently there are
a lot of subtle but important differences between cloud haskell processes
and my use of vats, e.g. regarding process identifiers, serializability
requirements, etc.
* http://research.microsoft.com/~simonpj/papers/parallel/remote.pdf (cloud
haskell)
Regards,
Dave
On Fri, Jan 13, 2012 at 9:24 PM, Joey Adams
In Haskell, sound logic and a great type system lead to elegant, composable code in a variety of domains, such as:
* Expression evaluation * Parsing * Concurrent programming (thanks to STM)
Asynchronous I/O is tricky. However, Haskell currently does little to alleviate the complexity (at least for me).
How can we structure network protocol APIs so that they stack well (e.g. only lock once, rather than locking each layer's connection state)? How can we deal with I/O errors without having to think about them at every turn?
For now, how can I structure my application's communication API so it's less messy to use?
Thanks, - Joey
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

This is an interesting problem, I think I might incorporate parts of it into the next revision of my Concurrent Haskell tutorial. It sounds like you're getting overwhelmed by several different problems, and dealing with them separately would probably help. e.g. you want some infrastructure to run two threads and send an exception to one whenever the other one dies. You also want to be able to avoid a thread being interrupted while performing an operation that should be atomic, like sending a message - this is slightly tricky, because there's a tradeoff between keeping the thread responsive and not interrupting an operation. The biggest hammer is maskUninterruptible, which can be used if all else fails. Whether Network.TLS supports simultaneous read and write I don't know, but you can examine the code or talk to the maintainer. If it doesn't, adding a layer of locking is straightforward, and doesn't increase overall complexity (it's localised). Cheers, Simon On 14/01/2012 05:24, Joey Adams wrote:
I'm not happy with asynchronous I/O in Haskell. It's hard to reason about, and doesn't compose well. At least in my code.
I'm currently trying to build a networking layer for my application using Network.TLS. Here is a rather minimalist API:
newtype Connection = Connection (TLSCtx Handle)
connectClient :: Handle -- ^ Connection handle, as returned by 'connectTo' -> X509 -- ^ TLS certificate (i.e. public key) -> IO Connection
connectServer :: Handle -- ^ Connection handle, as returned by 'accept' -> X509 -- ^ TLS certificate (i.e. public key) -> TLS.PrivateKey -- ^ TLS private key -> IO Connection
close :: Connection -> IO ()
sendMessage :: Connection -> Message -> IO ()
recvMessage :: Connection -> ByteString -> IO (Message, ByteString)
The module provides little more than connection initialization and message serialization. I don't try to use locks or STM to multiplex the connection or, in the case of recvMessage, hide connection state. I just be sure to only use sendMessage in one thread at a time, only use recvMessage in one thread at a time, and marshal the "extra bytes" parameter of recvMessage from call to call (with the help of StateT).
I wrote a simple "chat server" to test it. The client turned out okay:
main :: IO () main = do cert<- getCertificate handle<- connectTo "localhost" (PortNumber 1337) conn<- connectClient handle cert _<- forkIO $ forever $ do s<- getLine sendMessage conn $ TestMessage s forever $ flip runStateT B.empty $ do msg<- StateT $ recvMessage conn case msg of TestMessage s -> liftIO $ putStrLn s _ -> liftIO $ hPrintf stderr "Warning: unrecognized message from server: %s\n" (messageTypeName msg)
The only glaring problem is that, if the user presses Ctrl+D, the forked (sending) thread dies, but the main (receiving) thread lingers. I'd have to add exception handlers to ensure that when one thread dies, the other thread dies too.
However, the server is an abomination (see attachment).
Unfortunately, it's not as simple as "spawn one thread per client". We need at least two threads, one to listen for messages from the client, and another to send messages to the client. GHC won't let us simultaneously, in the same thread, wait for input from a connection and wait for an STM transaction to succeed.
Another source of complexity is: what if we throw an exception at a thread while it is in the middle of sending a packet? Then we can't shut down the connection properly (i.e. Network.TLS.bye), because the receiver might think the close_notify packet is part of the interrupted packet.
Having a thread for each client is good, as it:
* Lets us think about each client separately. No need to turn our code inside out or write one big loop that juggles all the clients.
* Isolates exceptions. If sendMessage or recvMessage throws an exception, it doesn't bring the whole server down.
On the other hand, having multiple threads interact with a single client is hard to think about:
* We have to synchronize the threads (e.g. when one dies, kill the other one)
* Multiple places where an exception can arise
* Can multiple threads interact with the connection handle simultaneously?
So why don't I make my connection API handle some of this? Well, I tried. There are so many ways to do it, and I couldn't find a way that simplified usage much. The approach used by Handle and by Network.TLS is to use MVars and IORefs to ensure that, if two threads access the same connection, the connection doesn't become totally corrupt. If I do the same, then I'll have *three* layers of locking under the hood.
Worse, the locking done by Handle and Network.TLS doesn't guarantee much. I don't know if it's safe to have one thread sending and another thread receiving. Especially in the case of Network.TLS, where 'recvData' automatically handshakes in some cases, which sends packets. Since I don't know how much thread safety to expect, I can't write networking code and know for sure that it is safe.
I'm certainly not protected from interleaved data if multiple threads send on the same handle. For example:
import Control.Concurrent import System.IO
main :: IO () main = do hSetBuffering stdout NoBuffering _<- forkIO $ putStrLn "One sentence." putStrLn "Another sentence."
produces:
AnOonteh esre nsteenntceen.c e.
That is, I can't rely on putStrLn being "atomic". To produce intelligible output (without changing the buffering mode), I have to "lock" the output each time I write something. putStrLn doesn't do it for me.
=== Summary ===
In Haskell, sound logic and a great type system lead to elegant, composable code in a variety of domains, such as:
* Expression evaluation * Parsing * Concurrent programming (thanks to STM)
Asynchronous I/O is tricky. However, Haskell currently does little to alleviate the complexity (at least for me).
How can we structure network protocol APIs so that they stack well (e.g. only lock once, rather than locking each layer's connection state)? How can we deal with I/O errors without having to think about them at every turn?
For now, how can I structure my application's communication API so it's less messy to use?
Thanks, - Joey
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe

I'd say use of asynchronous exceptions should be a last resort. Developers
should be encouraged to explicitly model any event notification system they
use.
Regards,
Dave
On Tue, Jan 17, 2012 at 1:42 AM, Simon Marlow
This is an interesting problem, I think I might incorporate parts of it into the next revision of my Concurrent Haskell tutorial.
It sounds like you're getting overwhelmed by several different problems, and dealing with them separately would probably help. e.g. you want some infrastructure to run two threads and send an exception to one whenever the other one dies. You also want to be able to avoid a thread being interrupted while performing an operation that should be atomic, like sending a message - this is slightly tricky, because there's a tradeoff between keeping the thread responsive and not interrupting an operation. The biggest hammer is maskUninterruptible, which can be used if all else fails.
Whether Network.TLS supports simultaneous read and write I don't know, but you can examine the code or talk to the maintainer. If it doesn't, adding a layer of locking is straightforward, and doesn't increase overall complexity (it's localised).
Cheers, Simon
On 14/01/2012 05:24, Joey Adams wrote:
I'm not happy with asynchronous I/O in Haskell. It's hard to reason about, and doesn't compose well. At least in my code.
I'm currently trying to build a networking layer for my application using Network.TLS. Here is a rather minimalist API:
newtype Connection = Connection (TLSCtx Handle)
connectClient :: Handle -- ^ Connection handle, as returned by 'connectTo' -> X509 -- ^ TLS certificate (i.e. public key) -> IO Connection
connectServer :: Handle -- ^ Connection handle, as returned by 'accept' -> X509 -- ^ TLS certificate (i.e. public key) -> TLS.PrivateKey -- ^ TLS private key -> IO Connection
close :: Connection -> IO ()
sendMessage :: Connection -> Message -> IO ()
recvMessage :: Connection -> ByteString -> IO (Message, ByteString)
The module provides little more than connection initialization and message serialization. I don't try to use locks or STM to multiplex the connection or, in the case of recvMessage, hide connection state. I just be sure to only use sendMessage in one thread at a time, only use recvMessage in one thread at a time, and marshal the "extra bytes" parameter of recvMessage from call to call (with the help of StateT).
I wrote a simple "chat server" to test it. The client turned out okay:
main :: IO () main = do cert<- getCertificate handle<- connectTo "localhost" (PortNumber 1337) conn<- connectClient handle cert _<- forkIO $ forever $ do s<- getLine sendMessage conn $ TestMessage s forever $ flip runStateT B.empty $ do msg<- StateT $ recvMessage conn case msg of TestMessage s -> liftIO $ putStrLn s _ -> liftIO $ hPrintf stderr "Warning: unrecognized message from server: %s\n" (messageTypeName msg)
The only glaring problem is that, if the user presses Ctrl+D, the forked (sending) thread dies, but the main (receiving) thread lingers. I'd have to add exception handlers to ensure that when one thread dies, the other thread dies too.
However, the server is an abomination (see attachment).
Unfortunately, it's not as simple as "spawn one thread per client". We need at least two threads, one to listen for messages from the client, and another to send messages to the client. GHC won't let us simultaneously, in the same thread, wait for input from a connection and wait for an STM transaction to succeed.
Another source of complexity is: what if we throw an exception at a thread while it is in the middle of sending a packet? Then we can't shut down the connection properly (i.e. Network.TLS.bye), because the receiver might think the close_notify packet is part of the interrupted packet.
Having a thread for each client is good, as it:
* Lets us think about each client separately. No need to turn our code inside out or write one big loop that juggles all the clients.
* Isolates exceptions. If sendMessage or recvMessage throws an exception, it doesn't bring the whole server down.
On the other hand, having multiple threads interact with a single client is hard to think about:
* We have to synchronize the threads (e.g. when one dies, kill the other one)
* Multiple places where an exception can arise
* Can multiple threads interact with the connection handle simultaneously?
So why don't I make my connection API handle some of this? Well, I tried. There are so many ways to do it, and I couldn't find a way that simplified usage much. The approach used by Handle and by Network.TLS is to use MVars and IORefs to ensure that, if two threads access the same connection, the connection doesn't become totally corrupt. If I do the same, then I'll have *three* layers of locking under the hood.
Worse, the locking done by Handle and Network.TLS doesn't guarantee much. I don't know if it's safe to have one thread sending and another thread receiving. Especially in the case of Network.TLS, where 'recvData' automatically handshakes in some cases, which sends packets. Since I don't know how much thread safety to expect, I can't write networking code and know for sure that it is safe.
I'm certainly not protected from interleaved data if multiple threads send on the same handle. For example:
import Control.Concurrent import System.IO
main :: IO () main = do hSetBuffering stdout NoBuffering _<- forkIO $ putStrLn "One sentence." putStrLn "Another sentence."
produces:
AnOonteh esre nsteenntceen.c e.
That is, I can't rely on putStrLn being "atomic". To produce intelligible output (without changing the buffering mode), I have to "lock" the output each time I write something. putStrLn doesn't do it for me.
=== Summary ===
In Haskell, sound logic and a great type system lead to elegant, composable code in a variety of domains, such as:
* Expression evaluation * Parsing * Concurrent programming (thanks to STM)
Asynchronous I/O is tricky. However, Haskell currently does little to alleviate the complexity (at least for me).
How can we structure network protocol APIs so that they stack well (e.g. only lock once, rather than locking each layer's connection state)? How can we deal with I/O errors without having to think about them at every turn?
For now, how can I structure my application's communication API so it's less messy to use?
Thanks, - Joey
______________________________**_________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/**mailman/listinfo/haskell-cafehttp://www.haskell.org/mailman/listinfo/haskell-cafe
______________________________**_________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/**mailman/listinfo/haskell-cafehttp://www.haskell.org/mailman/listinfo/haskell-cafe

On Tue, Jan 17, 2012 at 3:20 PM, David Barbour
I'd say use of asynchronous exceptions should be a last resort. ...
I agree. However, network libraries in Haskell (e.g. Handle, Network.TLS) generally don't provide the primitives needed to do that on the receiving end. For example, if a thread is blocked on hGetBuf, it cannot also wait on a signal telling it to stop. Since hClose on the same handle will block until the hGetBuf is done, the only way to stop reading from the handle is to throw an asynchronous exception at the hGetBuf thread. Worse, since there is no threadWaitReadHandle :: Handle -> IO (), there's no way to guarantee that hGetBuf will not be interrupted in the middle of receiving a packet. From an application perspective, this invalidates subsequent retrievals unless the protocol is self-synchronizing. -Joey

I uploaded a package that creates an STM layer over a network connection: http://hackage.haskell.org/package/stm-channelize I haven't used it in anger yet, but I hope it's a step in the right direction. I included a sample chat client and server. The client is pretty cute: main = let connect = connectTo "localhost" (PortNumber 1234) >>= connectHandle in channelize connect $ \conn -> channelize connectStdio $ \stdio -> forever $ atomically $ (recv conn >>= send stdio) `orElse` (recv stdio >>= send conn) I use channelize on both the network connection, and on stdin/stdout. The server is much longer, but shouldn't be terribly confusing. It demonstrates kicking out a client without a dangerous asynchronous exception, something we can't do easily without waiting on alternatives (i.e. orElse). -Joey
participants (7)
-
Bardur Arantsson
-
Daniel Waterworth
-
David Barbour
-
Joey Adams
-
Peter Simons
-
Simon Marlow
-
wren ng thornton