
Hi, I am new to concurrency in Haskell and I am having trouble implementing the notion of interrupting a thread. In a new thread, call it waitNotify, I am trying to do the following: pop a number from a stack, wait some number of seconds based on the number popped from the stack, perform some notification, and repeat until there are no more numbers in the stack at which point we wait for a new number. These numbers will be supplied interactively by the user from main. When the user supplies a new number, I want to interrupt whatever waiting is happening in waitNotify, insert the number in the proper position in the current stack, and resume waitNotify using the updated stack. Note, here "stack" is just a generalization; it will likely just be a list. What is the most idiomatic way to capture this sort of behavior in Haskell? My two challenges are the notion of interrupting a thread, and sharing and updating this stack between threads (main and waitNotify). Thank you

On Sat, 2009-12-26 at 23:11 -0600, Floptical Logic wrote:
Hi,
I am new to concurrency in Haskell and I am having trouble implementing the notion of interrupting a thread.
In a new thread, call it waitNotify, I am trying to do the following: pop a number from a stack, wait some number of seconds based on the number popped from the stack, perform some notification, and repeat until there are no more numbers in the stack at which point we wait for a new number.
These numbers will be supplied interactively by the user from main. When the user supplies a new number, I want to interrupt whatever waiting is happening in waitNotify, insert the number in the proper position in the current stack, and resume waitNotify using the updated stack. Note, here "stack" is just a generalization; it will likely just be a list.
What is the most idiomatic way to capture this sort of behavior in Haskell? My two challenges are the notion of interrupting a thread, and sharing and updating this stack between threads (main and waitNotify).
Thank you
1. Hmm. IMHO - maybe you should look on CHP. As far as I remeber writing/reading to channel there is blocking. 2. You mean stack (FILO) or queue (FIFO)? 3. Why don't just read lazily from input? Regards

MVars are the lowest-level operation for this kind of thing in Haskell, and they're very fast. Anything can be done with MVars but in some cases you need extra worker threads (cheap in Haskell), and you may even need to kill threads (which is a safe operation in Haskell). CHP is higher level and designed for this sort of complexity, so you might want to look at that. I'll give you the answer I know, which is a low-level MVar answer. I have *not* tried compiling this code. import Control.Concurrent import Control.Concurrent.MVar import Data.Int import System.Time type Microseconds = Int64 getSystemTime :: IO Microseconds getSystemTime = do (TOD sec pico) <- getClockTime return $! (fromIntegral sec::Int64) * 1000000 + (fromIntegral pico::Int64) `div` 1000000 type Stack a = [a] -- or whatever type you want isEmpty :: Stack a -> Bool isEmpty [] = True isEmpty _ = False pop :: Stack a -> (a, Stack a) data ScheduleInput = ModifyStack (Stack -> Stack) | WaitFor Microseconds | Timeout never = maxBound :: Microseconds schedule :: MVar ScheduleInput -> MVar a -> Stack a -> IO () schedule inpVar wnVar stack = schedule_ never stack where schedule_ :: Microseconds -> Stack -> IO () schedule_ timeout stack = do now <- getSystemTime let tillTimeout = 0 `max` (timeout - now) if tillTimeout == 0 && not (isEmpty stack) then do let (val, stack') = pop stack putMVar wnVar (PopValue val) schedule never stack' else do inp <- takeMVarWithTimeout (fromIntegral tillTimeout) inpVar case inp of ModifyStack f -> schedule_ timeout (f stack) WaitFor t -> do now <- getSystemTime schedule (t+now) stack Timeout -> schedule timeout stack readMVarWithTimeout :: Int -> MVar ScheduleInput -> IO ScheduleInput readMVar timeoutUS inpVar = do tid <- forkIO $ do threadDelay timeoutUS putMVar inpVar Timeout inp <- takeMVar inpVar killThread tid return inp waitNotify :: MVar ScheduleInput -> MVar Int -> IO () waitNotify schInp wnInp = do val <- takeMVar wnInp ...notify... let t = .... putMVar schInp $ WaitFor t -- block input for the specified period main = do schVar <- newEmptyMVar wnVar <- newEmptyMVar forkIO $ schedule schVar wnVar [] forkIO $ waitNotify wnVar schVar ... -- Modify stack according to user input inside your main IO loop putMVar schVar $ ModifyStack $ \stack -> ... I'm sure this is not exactly what you want, but at least it illustrates how you can achieve anything you like by using MVars + extra worker threads + killing threads (useful for implementing timeouts). Steve Floptical Logic wrote:
Hi,
I am new to concurrency in Haskell and I am having trouble implementing the notion of interrupting a thread.
In a new thread, call it waitNotify, I am trying to do the following: pop a number from a stack, wait some number of seconds based on the number popped from the stack, perform some notification, and repeat until there are no more numbers in the stack at which point we wait for a new number.
These numbers will be supplied interactively by the user from main. When the user supplies a new number, I want to interrupt whatever waiting is happening in waitNotify, insert the number in the proper position in the current stack, and resume waitNotify using the updated stack. Note, here "stack" is just a generalization; it will likely just be a list.
What is the most idiomatic way to capture this sort of behavior in Haskell? My two challenges are the notion of interrupting a thread, and sharing and updating this stack between threads (main and waitNotify).
Thank you _______________________________________________ Beginners mailing list Beginners@haskell.org http://www.haskell.org/mailman/listinfo/beginners

On Sun, Dec 27, 2009 at 1:47 PM, Stephen Blackheath [to
Haskell-Beginners]
MVars are the lowest-level operation for this kind of thing in Haskell, and they're very fast. Anything can be done with MVars but in some cases you need extra worker threads (cheap in Haskell), and you may even need to kill threads (which is a safe operation in Haskell). CHP is higher level and designed for this sort of complexity, so you might want to look at that. I'll give you the answer I know, which is a low-level MVar answer.
I have *not* tried compiling this code.
import Control.Concurrent import Control.Concurrent.MVar import Data.Int import System.Time
type Microseconds = Int64
getSystemTime :: IO Microseconds getSystemTime = do (TOD sec pico) <- getClockTime return $! (fromIntegral sec::Int64) * 1000000 + (fromIntegral pico::Int64) `div` 1000000
type Stack a = [a] -- or whatever type you want
isEmpty :: Stack a -> Bool isEmpty [] = True isEmpty _ = False
pop :: Stack a -> (a, Stack a)
data ScheduleInput = ModifyStack (Stack -> Stack) | WaitFor Microseconds | Timeout
never = maxBound :: Microseconds
schedule :: MVar ScheduleInput -> MVar a -> Stack a -> IO () schedule inpVar wnVar stack = schedule_ never stack where schedule_ :: Microseconds -> Stack -> IO () schedule_ timeout stack = do now <- getSystemTime let tillTimeout = 0 `max` (timeout - now) if tillTimeout == 0 && not (isEmpty stack) then do let (val, stack') = pop stack putMVar wnVar (PopValue val) schedule never stack' else do inp <- takeMVarWithTimeout (fromIntegral tillTimeout) inpVar case inp of ModifyStack f -> schedule_ timeout (f stack) WaitFor t -> do now <- getSystemTime schedule (t+now) stack Timeout -> schedule timeout stack
readMVarWithTimeout :: Int -> MVar ScheduleInput -> IO ScheduleInput readMVar timeoutUS inpVar = do tid <- forkIO $ do threadDelay timeoutUS putMVar inpVar Timeout inp <- takeMVar inpVar killThread tid return inp
waitNotify :: MVar ScheduleInput -> MVar Int -> IO () waitNotify schInp wnInp = do val <- takeMVar wnInp ...notify... let t = .... putMVar schInp $ WaitFor t -- block input for the specified period
main = do schVar <- newEmptyMVar wnVar <- newEmptyMVar forkIO $ schedule schVar wnVar [] forkIO $ waitNotify wnVar schVar ... -- Modify stack according to user input inside your main IO loop putMVar schVar $ ModifyStack $ \stack -> ...
I'm sure this is not exactly what you want, but at least it illustrates how you can achieve anything you like by using MVars + extra worker threads + killing threads (useful for implementing timeouts).
Steve
Floptical Logic wrote:
Hi,
I am new to concurrency in Haskell and I am having trouble implementing the notion of interrupting a thread.
In a new thread, call it waitNotify, I am trying to do the following: pop a number from a stack, wait some number of seconds based on the number popped from the stack, perform some notification, and repeat until there are no more numbers in the stack at which point we wait for a new number.
These numbers will be supplied interactively by the user from main. When the user supplies a new number, I want to interrupt whatever waiting is happening in waitNotify, insert the number in the proper position in the current stack, and resume waitNotify using the updated stack. Note, here "stack" is just a generalization; it will likely just be a list.
What is the most idiomatic way to capture this sort of behavior in Haskell? My two challenges are the notion of interrupting a thread, and sharing and updating this stack between threads (main and waitNotify).
Thank you _______________________________________________ Beginners mailing list Beginners@haskell.org http://www.haskell.org/mailman/listinfo/beginners
_______________________________________________ Beginners mailing list Beginners@haskell.org http://www.haskell.org/mailman/listinfo/beginners
Thanks Stephen. That worked and things are starting to make more sense. The crux of it all was the readMVarWithTimeout function which does exactly what I want. Now I want to extend this a bit further to use StateT. I've modified the schedule function quite a bit, and now it has type schedule :: StateT MyState IO. I keep the MVar actVar in MyState as well the list (stack) of wait times. However, the type of forkIO is forkIO :: IO () -> IO ThreadId. How do I fork a new thread for schedule even though it is wrapped in the State monad? Thanks again.

At 6:08 PM -0600 12/30/09, Floptical Logic wrote:
On Sun, Dec 27, 2009 at 1:47 PM, Stephen Blackheath [to Haskell-Beginners]
wrote: MVars are the lowest-level operation for this kind of thing in Haskell, and they're very fast. Anything can be done with MVars but in some cases you need extra worker threads (cheap in Haskell), and you may even need to kill threads (which is a safe operation in Haskell). CHP is higher level and designed for this sort of complexity, so you might want to look at that. I'll give you the answer I know, which is a low-level MVar answer.
I have *not* tried compiling this code.
import Control.Concurrent import Control.Concurrent.MVar import Data.Int import System.Time
type Microseconds = Int64
getSystemTime :: IO Microseconds getSystemTime = do (TOD sec pico) <- getClockTime return $! (fromIntegral sec::Int64) * 1000000 + (fromIntegral pico::Int64) `div` 1000000
type Stack a = [a] -- or whatever type you want
isEmpty :: Stack a -> Bool isEmpty [] = True isEmpty _ = False
pop :: Stack a -> (a, Stack a)
data ScheduleInput = ModifyStack (Stack -> Stack) | WaitFor Microseconds | Timeout
never = maxBound :: Microseconds
schedule :: MVar ScheduleInput -> MVar a -> Stack a -> IO () schedule inpVar wnVar stack = schedule_ never stack where schedule_ :: Microseconds -> Stack -> IO () schedule_ timeout stack = do now <- getSystemTime let tillTimeout = 0 `max` (timeout - now) if tillTimeout == 0 && not (isEmpty stack) then do let (val, stack') = pop stack putMVar wnVar (PopValue val) schedule never stack' else do inp <- takeMVarWithTimeout (fromIntegral tillTimeout) inpVar case inp of ModifyStack f -> schedule_ timeout (f stack) WaitFor t -> do now <- getSystemTime schedule (t+now) stack Timeout -> schedule timeout stack
readMVarWithTimeout :: Int -> MVar ScheduleInput -> IO ScheduleInput readMVar timeoutUS inpVar = do tid <- forkIO $ do threadDelay timeoutUS putMVar inpVar Timeout inp <- takeMVar inpVar killThread tid return inp
waitNotify :: MVar ScheduleInput -> MVar Int -> IO () waitNotify schInp wnInp = do val <- takeMVar wnInp ...notify... let t = .... putMVar schInp $ WaitFor t -- block input for the specified period
main = do schVar <- newEmptyMVar wnVar <- newEmptyMVar forkIO $ schedule schVar wnVar [] forkIO $ waitNotify wnVar schVar ... -- Modify stack according to user input inside your main IO loop putMVar schVar $ ModifyStack $ \stack -> ...
I'm sure this is not exactly what you want, but at least it illustrates how you can achieve anything you like by using MVars + extra worker threads + killing threads (useful for implementing timeouts).
Steve
Floptical Logic wrote:
Hi,
I am new to concurrency in Haskell and I am having trouble implementing the notion of interrupting a thread.
In a new thread, call it waitNotify, I am trying to do the following: pop a number from a stack, wait some number of seconds based on the number popped from the stack, perform some notification, and repeat until there are no more numbers in the stack at which point we wait for a new number.
These numbers will be supplied interactively by the user from main. When the user supplies a new number, I want to interrupt whatever waiting is happening in waitNotify, insert the number in the proper position in the current stack, and resume waitNotify using the updated stack. Note, here "stack" is just a generalization; it will likely just be a list.
What is the most idiomatic way to capture this sort of behavior in Haskell? My two challenges are the notion of interrupting a thread, and sharing and updating this stack between threads (main and waitNotify).
Thank you _______________________________________________ Beginners mailing list Beginners@haskell.org http://www.haskell.org/mailman/listinfo/beginners
_______________________________________________ Beginners mailing list Beginners@haskell.org http://www.haskell.org/mailman/listinfo/beginners
Thanks Stephen. That worked and things are starting to make more sense. The crux of it all was the readMVarWithTimeout function which does exactly what I want.
Now I want to extend this a bit further to use StateT. I've modified the schedule function quite a bit, and now it has type schedule :: StateT MyState IO. I keep the MVar actVar in MyState as well the list (stack) of wait times. However, the type of forkIO is forkIO :: IO () -> IO ThreadId. How do I fork a new thread for schedule even though it is wrapped in the State monad?
Thanks again.
You'll want something like this: myFork :: StateT MyState IO () -> MyState -> StateT MyState IO ThreadId myFork action initialState = liftIO (forkIO (evalStateT action initialState)) Dean
participants (4)
-
Dean Herington
-
Floptical Logic
-
Maciej Piechotka
-
Stephen Blackheath [to Haskell-Beginners]