Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
235 views
in Technique[技术] by (71.8m points)

multithreading - Yielding results from multiple threads (or processes) in Python

Alright, I'm trying to solve a specific kind of problem, and trying to wrap my brain around how I could solve with asyncio, coroutines, ThreadPoolExecutors, but I haven't cracked a solution for what I'm trying to do yet.

Generally, I'm trying to run a series of tasks in parallel, retrieve results from each in some way into an aggregator, and yield them to a caller. It'd be something like (in a pseudo-code):

def worker(host, criteria):
    results = do_something(host, criteria) # Could be getting results from a subprocess, calling a remote 
                                          # HTTP request, sshing to a remote system, etc.  But results 
                                          # will potentially be large set of results.
                                          # Operation will potentially take a while before it starts
                                          # generating results
    for result in results:
        
        ...do some processing (filter the result, transform it, etc)
        yield result

def aggregator(hosts, critera, numThreads=8):
    pool = ThreadPool(Processes=numThreads) # Or some pool; ThreadPoolExecutor or ProcessPool or etc
    threads = []    

    for host in hosts:
        thread = pool.apply_async(worker, host, criteria)
        threads.append(thread)

    while threads:
        for thread in threads:
            Check for results from the thread, if so then yield that result
            As each thread completes, remove from "threads"

    pool.close()


for result in aggregator(hosts, criteria):
    print(result)  # Or whatever

The intent here is to parallelize tasks across some workset (a group of hosts, URLs, files, whatever). And to provide a single stream of results back to the caller as they're available. Workset could be a large number of things and I potentially don't want to do them ALL in parallel (for resource reasons mostly), which is why I've tended towards thread or process pools (queues could also work possibly I know). Alternately have cached results to disk, used callback routines, etc but they're not ideal.

...While the actual code generally works, it results in each task being executed serially, which makes sense.

I've experimented with using queues to push the results from each task, and separately read from that to yield to the caller. It generally worked, but for bounded queues (like queue.Queue) the performance hit was massive (like, 4x runtime with otherwise really tight code; which was an unacceptable hit for what I'm trying to do and seems well-known that these queues are slow), and for unbounded or lossy/nonblocking queues, well, those performed well but aren't really options.

It seems like I SHOULD be able to create async, multithreaded, custom generators, or at least stream the results from multiple threads into a single generator (in an efficient way) - but I haven't worked out how.

question from:https://stackoverflow.com/questions/65860381/yielding-results-from-multiple-threads-or-processes-in-python

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...