Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
245 views
in Technique[技术] by (71.8m points)

python - Dask delayed sum gets killed but there are enough resources

I'm creating a function that reads and entire folder, creates a Dask dataframe, then processes the partitions of this dataframe and sums the results, like this:

import dask.dataframe as dd
from dask import delayed, compute

def partitions_func(folder):
    df = dd.read_csv(f'{folder}/*.csv')
    partial_results = []
    for partition in df.partitions:
        partial = another_function(partition)
        partial_results.append(partial)
    total = delayed(sum)(partial_results)
    return total

The function being called in partitions_func (another_function) is also delayed.

@delayed
def another_function(partition):
    # Partition processing
    return result

I checked and the variables created during the processing are all small, so they shouldn't cause any issues. The partitions can be quite large but not larger than the available RAM.

When I execute partitions_func(folder), the process gets killed. At first, I thought the problem had to do with having two delayed, one on another_function and one on delayed(sum).

Removing the delayed decorator from another_function causes issues because the argument is a Dask dataframe and you can't do operations like tolist(). I tried removing delayed from sum, because I thought it could be a problem with parallelisation and the available resources but the process also gets killed.

However, I know there are 5 partitions. If I remove the statement total = delayed(sum)(partial_results) from partitions_func and compute the sum "manually" instead, everything works as expected:

total = partial_results[0].compute() + partial_results[1].compute() + partial_results[2].compute() 
        + partial_results[3].compute() + partial_results[4].compute()

Thanks!

question from:https://stackoverflow.com/questions/66046996/dask-delayed-sum-gets-killed-but-there-are-enough-resources

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Dask dataframe creates a series of delayed objects, so when you call a delayed function another_function that becomes a nested delayed and dask.compute will not be able to handle it. One option is to use .map_partitions(), the typical example is df.map_partitions(len).compute(), which will compute length of each partition. So if you can rewrite another_function to accept a pandas dataframe, and remove the delayed decorator, then your code will roughly look like this:

df = dd.read_csv(f'{folder}/*.csv')
total = df.map_partitions(another_function)

Now total is a delayed object which you can pass to dask.compute (or simply run total = df.map_partitions(another_function).compute()).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...