Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
81 views
in Technique[技术] by (71.8m points)

Python Stop Processes in Generator

I have an "AbstractCollector" class that provides a record generator from a collect() method. I also have a "MultiCollector" which can run multiple "AbstractCollector"s in other processes, and return their combined results from a single generator (using Queue).

My MultiCollector class looks like this-

class MultiCollector(AbstractDataCollector):
    # ... 

    def collect(self):
        sub_collectors: List[AbstractDataCollector] = # ... create sub collectors
        

        m = multiprocessing.Manager()
        q = m.Queue()

        futures = []
        with ProcessPoolExecutor(max_workers=self.config.parallel_scans) as executor:
            for sub_collector in sub_collectors:
                f = executor.submit(self.run_collector_process, q, sub_collector)
                futures.append(f)

            while receiving:
                try:
                    message = q.get_nowait()
                except Empty:
                    time.sleep(0.01)
                    message = None

                if message:
                    yield message

                # Check for any future still running
                receiving = False
                for future in futures:
                    if not future.done():
                        receiving = True
                        break

            for future in futures:
                # Force complete
                future.result(0)

    @staticmethod
    def run_collector_process(queue: Queue, collector: AbstractDataCollector):
        for data in collector.collect():
            queue.put(data)

What this looks like is:

my_multi_collector = MultiCollector()

for record in my_multi_collector.collect():
  print(record)

But I would like to stop the collection process early after an arbitrary number of returned records. So under normal circumstances I could do something link this:

my_multi_collector = MultiCollector()

i = 0
for record in my_multi_collector.collect():
  i += 1
  if i > 20:
    break
  
  print(record) 

But with my multicollector, when I break here the processes spawned by collect() do not know to stop.

Is it possible to know when a generator is broken out of so I can stop the spawn processes within collect()?

question from:https://stackoverflow.com/questions/65925206/python-stop-processes-in-generator

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

When a generator is garbage-collected or explicitly closed (via the .close() method), a GeneratorExit exception is raised within the generator. You can catch this exception to perform manual cleanup if with statements and the like aren't covering your use case. And you can ensure close is called explicitly to avoid arbitrary cleanup delays when reference cycles are present (or you're on a non-CPython interpreter that uses a strategy other than reference counting for memory management).

So in your caller, you can change:

for record in my_multi_collector.collect():
    ... body goes here ...

to:

# At top of file
from contextlib import closing  # Allows easy guarantee close behavior

# Replacing existing code
with closing(my_multi_collector.collect()) as results:
    for record in records:
        ... body goes here ...

# When code dedents out of with, generator closed automatically, including on exception

Then if your with statements within the generator aren't enough, you can explicitly wrap code with:

try:
    ... code goes here ...
except GeneratorExit:
    ... cleanup on early exit goes here ...
    raise  # Let exception continue bubbling to ensure cleanup finishes
finally:
    ... cleanup that must always run even when run to completion goes here ...

to perform any manual cleanup required (omitting except GeneratorExit: or finally: depending on what sort of cleanup conditions apply).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...