Fwd: Conduit and pipelined protocol processing using a threadpool

Forwarding to maillist:
Hi, Nicolas.
Just my 2 cents.
You can try to 'break' your server side application in two parts,
reader and writer then you can have:
main = do
ch1 <- ... -- inbound channel of type (message ,outchannel)
replicateM n $ forkIO $ sourceTBMChan ch1 $$ loop1
-- ^^ you can move this code into myApp if you need per client workers
runTCPServer config $ \ad -> do
ch2 <- ... -- outbound channel
{- now will start 2 threads for reader and writer)
bracket (forkIO $ sourceTBMChan ch2 $= appSource ad)
(\t -> threadDestroy t >> atomically (closeTBMChan ch2))
(const $ appSource ad $$ loop2 ch2)
where loop1 = do
mm <- await
case mm of
Just (m, out) -> process m >>= atomically (writeTBMChan out) >> loop1
Nothing -> return ()
{- using conduit-stm in loop2 will fail as it will close channel
at the end of the processing -}
loop2 ch = await >>= \x -> when (isJust x) (writeTBMChan ch
(fromJust x, ch) >> loop2 ch)
I have not checked this code, but the same approach works 2 projects.
--
Alexander Vershilov
On 28 November 2012 09:21, Nicolas Trangez
On Wed, 2012-11-28 at 09:17 +0200, Michael Snoyman wrote:
On Tue, Nov 27, 2012 at 7:25 PM, Nicolas Trangez
wrote: Michael, On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote: > I think the stm-conduit package[1] may be helpful for this use case. > Each time you get a new command, you can fork a thread and give it the > TBMChan to write to, and you can use sourceTBMChan to get a source to > send to the client.
That's +- what I had in mind. I did find stm-conduit before and did try to get the thing working using it, but these attempts failed.
I attached an example which might clarify what I intend to do. I'm aware it contains several potential bugs (leaking threads etc), but that's beside the question ;-)
If only I could figure out what to put on the 3 lines of comment I left in there...
Thanks for your help,
Nicolas
The issue is that you're trying to put everything into a single Conduit, which forces reading and writing to occur in a single thread of execution. Since you want your writing to be triggered by a separate event (data being available on the Chan), you're running into limitations.
The reason network-conduit provides a Source for the incoming data and a Sink for outgoing data is specifically to address your use case. You want to take the data from the Source and put it into the Chan in one thread, and take the data from the other Chan and put it into the Sink in a separate thread. Something like:
myApp appdata = do chan1 <- ... chan2 <- ... replicateM_ 5 $ forkIO $ worker chan1 chan2 forkIO $ appSource appdata $$ sinkTBMChan chan1 sourceTBMChan chan2 $$ appSink appdata
You'll also want to make sure to close chan1 and chan2 to make sure that your threads stop running.
Thanks, I +- figured that out last night. Only thing left is managing the handshake before forking off the workers, if possible (otherwise things become very messy IMHO), but ResumableSources etc might be of use here.
Maybe there's a library somewhere in here...
Thanks a bunch,
Nicolas
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
-- С уважением, Вершилов Александр ( mail-to: alexander.vershilov@gmail.com ) -- С уважением, Вершилов Александр ( mail-to: alexander.vershilov@gmail.com )
participants (1)
-
Alexander V Vershilov