How to timeout a long running program using rxpython?

629 Views Asked by At

Say I have a long running python function that looks something like this?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x

I want to be able to set a timeout of 1000ms.

So I'm dong something like, creating an observable and mapping it through the above intense calculation.

a = Observable.repeat(1).map(lambda x: intns(x))

Now for each value emitted, if it takes more than 1000ms I want to end the observable, as soon as I reach 1000ms using on_error or on_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

above statement does get timeout, and calls on_error, but it goes on to finish calculating the intense calculation and only then returns to the next statements. Is there a better way of doing this?

The last statement prints the following

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

The idea is that if a particular function timesout, i want to be able to continue with my program, and ignore the result.

I was wondering if the intns function was done on a separate thread, I guess the main thread continues execution after timeout, but I still want to stop computing intns function on a thread, or kill it somehow.

5

There are 5 best solutions below

2
On

You can do this partially using threading Although there's no specific way to kill a thread in python, you can implement a method to flag the thread to end.

This won't work if the thread is waiting on other resources (in your case, you simulated a "long" running code by a random wait)

See also Is there any way to kill a Thread in Python?

3
On

This way it works:

import random
import time
import threading
import os

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x


thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > 9):
        os._exit(0)
3
On

Here's an example for timeout

import random
import time
import threading

_timeout = 0

def intns(loops=1):
    print('begin')
    processing = 0
    for i in range(loops):
        y = random.randint(5,10)
        time.sleep(y)
        if _timeout == 1:
            print('timedout end')
            return
        print('keep processing')
    return

# this will timeout
timeout_seconds = 10
loops = 10

# this will complete
#timeout_seconds = 30.0
#loops = 1

thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
    if(time.clock() - st > timeout_seconds):
        _timeout = 1

thr.join()
if _timeout == 0:
    print ("completed")
else:
    print ("timed-out")
0
On

The following is a class that can be called using with timeout() :

If the block under the code runs for longer than the specified time, a TimeoutError is raised.

import signal

class timeout:
    # Default value is 1 second (1000ms)
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

# example usage
with timeout() :
    # infinite while loop so timeout is reached
    while True :
        pass

If I'm understanding your function, here's what your implementation would look like:

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    with timeout() :
        time.sleep(y)
    print('end')
    return x
0
On

You can use time.sleep() and make a while loop for time.clock()