I'm creating a function that reads and entire folder, creates a Dask dataframe, then processes the partitions of this dataframe and sums the results, like this:
import dask.dataframe as dd
from dask import delayed, compute
def partitions_func(folder):
df = dd.read_csv(f'{folder}/*.csv')
partial_results = []
for partition in df.partitions:
partial = another_function(partition)
partial_results.append(partial)
total = delayed(sum)(partial_results)
return total
The function being called in partitions_func
(another_function
) is also delayed.
@delayed
def another_function(partition):
# Partition processing
return result
I checked and the variables created during the processing are all small, so they shouldn't cause any issues. The partitions can be quite large but not larger than the available RAM.
When I execute partitions_func(folder)
, the process gets killed. At first, I thought the problem had to do with having two delayed
, one on another_function
and one on delayed(sum)
.
Removing the delayed
decorator from another_function
causes issues because the argument is a Dask dataframe and you can't do operations like tolist()
. I tried removing delayed
from sum
, because I thought it could be a problem with parallelisation and the available resources but the process also gets killed.
However, I know there are 5 partitions. If I remove the statement total = delayed(sum)(partial_results)
from partitions_func
and compute the sum "manually" instead, everything works as expected:
total = partial_results[0].compute() + partial_results[1].compute() + partial_results[2].compute()
+ partial_results[3].compute() + partial_results[4].compute()
Thanks!
question from:
https://stackoverflow.com/questions/66046996/dask-delayed-sum-gets-killed-but-there-are-enough-resources 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…