Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
228 views
in Technique[技术] by (71.8m points)

python - How to efficiently concatenate many computed dataframes?

Using distributed scheduler I'm ingesting data from many binary source files which don't lend themselves to Dask's provided methods (e.g. read_csv(), read_parquet(), etc), and for each binary file I'm producing a pandas dataframe (within a delayed-decorated function).

In my infancy with Dask I'm trying to understand how to efficiently concatenate all the pandas.dataframes into a single dask.dataframe for further processing. This dask.dataframe will be larger than memory, though in my testing so far I'm using reduced data volumes.

My code is resulting in only a single worker being active, and the process taking a very long time, even though the graph visualization seems to suggest parallel operation. I don't understand why.

import dask.dataframe as dd

def process_data_ddf(filenames):
    narrowband_ddf_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_ddf = dd.from_delayed(narrowband_df)
        narrowband_ddf_list.append(narrowband_ddf)
    narrowbands_ddf = dd.concat(narrowband_ddf_list)
    return narrowbands_ddf

result = dask.compute(process_data_ddf(filenames))

I tried modifying this code such that I just collect a list of a pandas dataframes and call pd.concat() at the end (code below). With this, all workers are active and the process completes quickly, but I don't believe this will scale well.

def process_data_df(filenames):
    narrowband_df_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_df_list.append(narrowband_df)
    return narrowband_df_list

result = pd.concat(dask.compute(process_data_df(filenames))[0])

process_data_ddf graph: enter image description here

process_data_df graph: enter image description here

Most of the docs seem to focus on aggregating data on import using something like dd.read_csv('myfiles.*.csv'). What's the best way to approach this for my use case?

CLARIFICATIONS:

  • calculate_narrowband() and read_a_file() have @dask.delayed decorators.
  • All dataframes I'm trying to concatenate have identical columns and no index duplicates
  • Dataframe indeces are datetimes, I don't care about sort.
question from:https://stackoverflow.com/questions/65647395/how-to-efficiently-concatenate-many-computed-dataframes

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I misunderstood the purpose of from_delayed() and now realize it can accept a list of delayed.

This appears to perform nicely from initial testing:

def process_data_ddf(filenames):
    narrowband_df_list = []
    for f in filenames:
        tdms_data = read_a_file(f)
        narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
        narrowband_df_list.append(narrowband_df)
    narrowband_ddf = dd.from_delayed(narrowband_df_list)
    return narrowband_ddf

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...