
Hi haskellers I´m implementing a form of intelligent streaming among nodes so that a process could know if there are bottlenecks on sending/receiving data so the process can increase or decrease the number of worker threads, for example. For this purpose, I planned to use hPutBufNonBlocking and hGetBufNonBlocking to detect when the buffer is full, using block buffering. I created hPutStrLn' that tries to write the entire string in the buffer. if it does not fit, the process would be notified in a state variable, then it would do a flush of the buffer and the rest of the string would be sent with hPutBuf. Doing some tinkering here and there It came up to be this: hPutStrLn' h str= do let bs@(PS ps s l) = BS.pack $ str ++ "\n" n <- withForeignPtr ps $ \p-> hPutBufNonBlocking h (p `plusPtr` s) l when( n < l) $ do error "BUFFER FULLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL" hFlush h withForeignPtr ps $ \p -> hPutBuf h ( p `plusPtr` (n * sizeOf 'x' ) ) (l - n) return () The error condition is the one that I expected to detect in the tests. In the real code, this line just would set a state variable, that would be read by the process somewhere else. The problem is that this routine behaves identically to hPutStrLn. The error condition never happens and hPutBufNonBlocking send the complete string every time. I created a program https://gist.github.com/agocorona/6568bd61d71ab921ad0c The example print the output of the receiver "hello" continuously Note that in the complete running code below, the receiver has a threadDelay of a second, so the send and receive buffers should be full. That happens in Windows and Linux, in a single process (like in the example) or within two processes in the same machine. How the processes can detect the congestion? -- Alberto.

On Thu, Sep 17, 2015 at 5:50 AM, Alberto G. Corona
That happens in Windows and Linux, in a single process (like in the example) or within two processes in the same machine.
I'm a little surprised this program works at all on Windows, because the winsock stuff gets torn down when `withSocketsDo $ listenOn port` is done? And you do other socket ops outside of withSocketsDo. Otherwise, it looks like you check for n < 1, not n < l; you would only detect buffer-full if the entire write failed, not for partial writes. -- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net

It apparently works under ghc 7.8.3 and windows 10. anyway I did not
noticed it.
I usually insert withSocketsDo too.
2015-09-17 14:09 GMT+02:00 Brandon Allbery
On Thu, Sep 17, 2015 at 5:50 AM, Alberto G. Corona
wrote: That happens in Windows and Linux, in a single process (like in the example) or within two processes in the same machine.
I'm a little surprised this program works at all on Windows, because the winsock stuff gets torn down when `withSocketsDo $ listenOn port` is done? And you do other socket ops outside of withSocketsDo.
Otherwise, it looks like you check for n < 1, not n < l; you would only detect buffer-full if the entire write failed, not for partial writes.
-- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net
-- Alberto.

On Thu, Sep 17, 2015 at 8:46 AM, Alberto G. Corona
It apparently works under ghc 7.8.3 and windows 10. anyway I did not noticed it. I usually insert withSocketsDo too.
It is possible that recent network package changed this, but in the past *all* socket operations had to be under the aegis of *one* withSocketsDo, otherwise any handles, buffers, etc. would become invalid when Winsock was deinitialized. -- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net

On 17/09/2015 13:52, Brandon Allbery wrote:
... It is possible that recent network package changed this, but in the past *all* socket operations had to be under the aegis of *one* withSocketsDo, otherwise any handles, buffers, etc. would become invalid when Winsock was deinitialized.
See http://neilmitchell.blogspot.co.uk/2015/02/making-withsocketsdo-unnecessary....

It seems that hPutBuffNonBlocking flush the buffer and blocks anyway when
it has not enough space for the next message.
-- else, we have to flush else do debugIO
https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Handle.Inte...
"hPutBuf: flushing first" old_buf'
https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Handle.Text...
<- Buffered.flushWriteBuffer
https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....
haDevice https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Handle.Text...
old_buf https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Handle.Text...
-- TODO: we should do a non-blocking flush here
https://tldrify.com/bga
this should be a bug or a feature not implemented.
since the flush uses flushWriteBuffer
https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....,
that blocks, hPutBuffNonBlocking does the same than hPutBuff and the
buffer congestion can not be detected.
I will try to do a new version with flushWriteBuffer0
https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....
which
do not blocks.
2015-09-17 15:41 GMT+02:00 james
On 17/09/2015 13:52, Brandon Allbery wrote:
... It is possible that recent network package changed this, but in the past *all* socket operations had to be under the aegis of *one* withSocketsDo, otherwise any handles, buffers, etc. would become invalid when Winsock was deinitialized.
See http://neilmitchell.blogspot.co.uk/2015/02/making-withsocketsdo-unnecessary....
-- Alberto.

On Thu, Sep 17, 2015 at 10:01 AM, Alberto G. Corona
since the flush uses flushWriteBuffer https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....
, that blocks, hPutBuffNonBlocking does the same than hPutBuff and the buffer congestion can not be detected.
Hm. I wonder if this is the DynamicLog bug we've been fighting with in xmonad, too. (pipe full -> xmonad locks up, blocked on pipe write) -- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net

It could be, since this module is general for any kind of buffered IO
2015-09-17 16:04 GMT+02:00 Brandon Allbery
On Thu, Sep 17, 2015 at 10:01 AM, Alberto G. Corona
wrote: since the flush uses flushWriteBuffer https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....
, that blocks, hPutBuffNonBlocking does the same than hPutBuff and the buffer congestion can not be detected.
Hm. I wonder if this is the DynamicLog bug we've been fighting with in xmonad, too. (pipe full -> xmonad locks up, blocked on pipe write)
-- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net
-- Alberto.

I came up with this implementation below, that theoretically flush the
buffer non blocking
hPutBufNonBlocking handle ptr count
| count == 0 = return 0
| count < 0 = error "negative chunk size"
| otherwise =
wantWritableHandle "hPutBuf" handle $
\ h_@Handle__{..} -> bufWriteNonBlocking h_ (castPtr ptr) count False
bufWriteNonBlocking :: Handle__-> Ptr Word8 -> Int -> Bool -> IO Int
bufWriteNonBlocking h_@Handle__{..} ptr count can_block =
seq count $ do -- strictness hack
old_buf@Buffer{ bufR=w, bufSize=size } <- readIORef haByteBuffer
-- print (size,w, count)
old_buf'@Buffer{ bufR=w', bufSize = size' } <-
if size - w <= count
then do
(written,old_buf') <- Buffered.flushWriteBuffer0 haDevice
old_buf
writeIORef haByteBuffer old_buf'
print (size , written,w, count)
print (bufSize old_buf', bufR old_buf')
return old_buf'
else return old_buf
let count'= if size' - w' > count then count else size' - w'
writeChunkNonBlocking h_ (castPtr ptr) count'
writeIORef haByteBuffer old_buf'{ bufR = w' + count' }
return count'
writeChunkNonBlocking h_@Handle__{..} ptr bytes
| Just fd <- cast haDevice = RawIO.writeNonBlocking (fd::FD) ptr bytes
| otherwise = error "Todo: hPutBuf"
But:
flushWriteBuffer0
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO.h...
:: dev
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO.h...
-> Buffer
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Buffer.html#...
Word8
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.Word.html#Word8
-> IO (Int, Buffer
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.Buffer.html#...
Word8
http://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.Word.html#Word8
)
-- | Flush data from the supplied write buffer out to the device --
without blocking. Returns the number of bytes written and the --
remaining buffer.
should flush the send buffer as much as possible without waiting for
enough available space in the device/receiving side to empty the send buffer
but it blocks as well (at least using sockets), and waits until the whole
send buffer is emptied, just like ffunshWriteBuffer.
So it is not possible for the application to know if both buffers are
full. It can be ckecked if the send buffer is full before flushing, but
the device buffers and the receiving buffer may be empty, and the
receiving process idle. In the other side, if the buffer is flushed, since
it blocks, the send buffer will appear empty after blocking for some time.
So the process can do nothing to detect the congestion condition and it
will be non responsive to other events.
Can fusshWriteBuffer0 and hPutBufNonBlocking be fixed?
2015-09-17 16:08 GMT+02:00 Alberto G. Corona
It could be, since this module is general for any kind of buffered IO
2015-09-17 16:04 GMT+02:00 Brandon Allbery
: On Thu, Sep 17, 2015 at 10:01 AM, Alberto G. Corona
wrote: since the flush uses flushWriteBuffer https://hackage.haskell.org/package/base-4.8.1.0/docs/src/GHC.IO.BufferedIO....
, that blocks, hPutBuffNonBlocking does the same than hPutBuff and the buffer congestion can not be detected.
Hm. I wonder if this is the DynamicLog bug we've been fighting with in xmonad, too. (pipe full -> xmonad locks up, blocked on pipe write)
-- brandon s allbery kf8nh sine nomine associates allbery.b@gmail.com ballbery@sinenomine.net unix, openafs, kerberos, infrastructure, xmonad http://sinenomine.net
-- Alberto.
-- Alberto.
participants (3)
-
Alberto G. Corona
-
Brandon Allbery
-
james