Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
477 views
in Technique[技术] by (71.8m points)

multiprocess - Python: concurrent.futures How to make it cancelable?

Python concurrent.futures and ProcessPoolExecutor provide a neat interface to schedule and monitor tasks. Futures even provide a .cancel() method:

cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Unfortunately in a simmilar question (concerning asyncio) the answer claims running tasks are uncancelable using this snipped of the documentation, but the docs dont say that, only if they are running AND uncancelable.

Submitting multiprocessing.Events to the processes is also not trivially possible (doing so via parameters as in multiprocess.Process returns a RuntimeError)

What am I trying to do? I would like to partition a search space and run a task for every partition. But it is enough to have ONE solution and the process is CPU intensive. So is there an actual comfortable way to accomplish this that does not offset the gains by using ProcessPool to begin with?

Example:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I don't know why concurrent.futures.Future does not have a .kill() method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False), and killing the remaining child processes by hand.

Create a function for killing child processes:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

Run your code until you get the first result, then kill all remaining child processes:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...