Alright, I'm trying to solve a specific kind of problem, and trying to wrap my brain around how I could solve with asyncio, coroutines, ThreadPoolExecutors, but I haven't cracked a solution for what I'm trying to do yet.
Generally, I'm trying to run a series of tasks in parallel, retrieve results from each in some way into an aggregator, and yield them to a caller. It'd be something like (in a pseudo-code):
def worker(host, criteria):
results = do_something(host, criteria) # Could be getting results from a subprocess, calling a remote
# HTTP request, sshing to a remote system, etc. But results
# will potentially be large set of results.
# Operation will potentially take a while before it starts
# generating results
for result in results:
...do some processing (filter the result, transform it, etc)
yield result
def aggregator(hosts, critera, numThreads=8):
pool = ThreadPool(Processes=numThreads) # Or some pool; ThreadPoolExecutor or ProcessPool or etc
threads = []
for host in hosts:
thread = pool.apply_async(worker, host, criteria)
threads.append(thread)
while threads:
for thread in threads:
Check for results from the thread, if so then yield that result
As each thread completes, remove from "threads"
pool.close()
for result in aggregator(hosts, criteria):
print(result) # Or whatever
The intent here is to parallelize tasks across some workset (a group of hosts, URLs, files, whatever). And to provide a single stream of results back to the caller as they're available. Workset could be a large number of things and I potentially don't want to do them ALL in parallel (for resource reasons mostly), which is why I've tended towards thread or process pools (queues could also work possibly I know). Alternately have cached results to disk, used callback routines, etc but they're not ideal.
...While the actual code generally works, it results in each task being executed serially, which makes sense.
I've experimented with using queues to push the results from each task, and separately read from that to yield to the caller. It generally worked, but for bounded queues (like queue.Queue) the performance hit was massive (like, 4x runtime with otherwise really tight code; which was an unacceptable hit for what I'm trying to do and seems well-known that these queues are slow), and for unbounded or lossy/nonblocking queues, well, those performed well but aren't really options.
It seems like I SHOULD be able to create async, multithreaded, custom generators, or at least stream the results from multiple threads into a single generator (in an efficient way) - but I haven't worked out how.
question from:
https://stackoverflow.com/questions/65860381/yielding-results-from-multiple-threads-or-processes-in-python