OK, so what's wrong here is that each task contains the numpy array x
, which is large. For each of the 100 tasks that we submit we need to serialize x
, send it up to the scheduler, send it over to the worker, etc..
Instead, we'll send the array up to the cluster once:
[future] = c.scatter([x])
Now future
is a token that points to an array x
that lives on the cluster. Now we can submit tasks that refer to this remote future, instead of the numpy array on our local client.
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
This is now much faster, and lets Dask control data movement more effectively.
Scatter data to all workers
If you expect to need to move the array x to all workers eventually then you may want to broadcast the array to start
[future] = c.scatter([x], broadcast=True)
Use Dask Delayed
Futures work fine with dask.delayed as well. There is no performance benefit here, but some people prefer this interface:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…