Using distributed scheduler I'm ingesting data from many binary source files which don't lend themselves to Dask's provided methods (e.g. read_csv()
, read_parquet()
, etc), and for each binary file I'm producing a pandas dataframe (within a delayed-decorated function).
In my infancy with Dask I'm trying to understand how to efficiently concatenate all the pandas.dataframe
s into a single dask.dataframe
for further processing. This dask.dataframe
will be larger than memory, though in my testing so far I'm using reduced data volumes.
My code is resulting in only a single worker being active, and the process taking a very long time, even though the graph visualization seems to suggest parallel operation. I don't understand why.
import dask.dataframe as dd
def process_data_ddf(filenames):
narrowband_ddf_list = []
for f in filenames:
tdms_data = read_a_file(f)
narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
narrowband_ddf = dd.from_delayed(narrowband_df)
narrowband_ddf_list.append(narrowband_ddf)
narrowbands_ddf = dd.concat(narrowband_ddf_list)
return narrowbands_ddf
result = dask.compute(process_data_ddf(filenames))
I tried modifying this code such that I just collect a list of a pandas dataframes and call pd.concat()
at the end (code below). With this, all workers are active and the process completes quickly, but I don't believe this will scale well.
def process_data_df(filenames):
narrowband_df_list = []
for f in filenames:
tdms_data = read_a_file(f)
narrowband_df = calculate_narrowband(tdms_data["metadata"], tdms_data["data"])
narrowband_df_list.append(narrowband_df)
return narrowband_df_list
result = pd.concat(dask.compute(process_data_df(filenames))[0])
process_data_ddf
graph:
process_data_df
graph:
Most of the docs seem to focus on aggregating data on import using something like dd.read_csv('myfiles.*.csv')
. What's the best way to approach this for my use case?
CLARIFICATIONS:
calculate_narrowband()
and read_a_file()
have @dask.delayed
decorators.
- All dataframes I'm trying to concatenate have identical columns and no index duplicates
- Dataframe indeces are datetimes, I don't care about sort.
question from:
https://stackoverflow.com/questions/65647395/how-to-efficiently-concatenate-many-computed-dataframes 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…