Here's a way you can do this without needing to change your worker
function. There are two steps required:
- Use the
maxtasksperchild
option you can pass to multiprocessing.Pool
to ensure the worker processes in the pool are restarted after every task execution.
- Wrap your existing worker function in another function, which will call
worker
in a daemon thread, and then wait for a result from that thread for timeout
seconds. Using a daemon thread is important because processes won't wait for daemon threads to finish before exiting.
If the timeout expires, you exit (or abort - it's up to you) the wrapper function, which will end the task, and because you've set maxtasksperchild=1
, cause the Pool
to terminate the worker process and start a new one. This will mean that the background thread doing your real work also gets aborted, because it's a daemon thread, and the process it's living got shut down.
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def worker(x, y, z):
pass # Do whatever here
def collectMyResult(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
out = res.get(timeout) # Wait timeout seconds for func to complete.
return out
except multiprocessing.TimeoutError:
print("Aborting due to timeout")
raise
if __name__ == "__main__":
pool = multiprocessing.Pool(maxtasksperchild=1)
featureClass = [[1000,k,1] for k in range(start,end,step)] #list of arguments
for f in featureClass:
abortable_func = partial(abortable_worker, worker, timeout=3)
pool.apply_async(abortable_func, args=f,callback=collectMyResult)
pool.close()
pool.join()
Any function that timeouts will raise multiprocessing.TimeoutError
. Note that this means your callback won't execute when a timeout occurs. If this isn't acceptable, just change the except
block of abortable_worker
to return something instead of calling raise
.
Also keep in mind that restarting worker processes after every task execution will have a negative impact on the performance of the Pool
, due to the increased overhead. You should measure that for your use-case and see if the trade-off is worth it to have the ability to abort the work. If it's a problem, you may need to try another approach, like co-operatively interrupting worker
if it has run too long, rather than trying to kill it from the outside. There are many questions on SO that cover this topic.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…