I have a situation where I have a large job queue that I want to abort early.
Later jobs depend on earlier jobs, so I can not queue all jobs at once. Consider the following MWE:
from dask.distributed import Client, as_completed
import numpy as np
def work(_):
return np.random.random(size=(100_000, 50))
def main(func):
with Client() as client:
futures = client.map(func, range(10), pure=False) # pre-determined work
ac = as_completed(futures, with_results=True)
for future, result in ac:
new_future = client.submit(func, 0, pure=False) # work depends on earlier output
ac.add(new_future)
break # Some condition is met, remaining jobs are irrelevant & can be aborted/discarded
ac.clear()
if __name__ == '__main__':
main(work)
The above example will generally produce the following errors:
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:58617'], work-24a67a6d-4479-4f62-9865-bd48442198c4
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'work-24a67a6d-4479-4f62-9865-bd48442198c4': ('tcp://127.0.0.1:58617',)}
I was hoping as_completed.clear
would cleanly deal with the remaining futures.
I also made a variation that keeps track of all futures, and cancels them before calling as_completed.clear
, but it produces similar results.
Is there a proper way to achieve this intended behavior?
notes:
- if
work
returns None
instead, the problem still seems to occur, just less frequently.
- I tested this on my Windows machine, and in from a Ubuntu docker container.
- I used Python 3.8.3 and dask/distributed 2020.12.0
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…