Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
175 views
in Technique[技术] by (71.8m points)

python - How to save a huge pandas dataframe to hdfs?

Im working with pandas and with spark dataframes. The dataframes are always very big (> 20 GB) and the standard spark functions are not sufficient for those sizes. Currently im converting my pandas dataframe to a spark dataframe like this:

dataframe = spark.createDataFrame(pandas_dataframe)  

I do that transformation because with spark writing dataframes to hdfs is very easy:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

But the transformation is failing for dataframes which are bigger than 2 GB. If I transform a spark dataframe to pandas I can use pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

This is a fast converstion from spark to pandas and it also works for dataframes bigger than 2 GB. I yet could not find a way to do it the other way around. Meaning having a pandas dataframe which I transform to spark with the help of pyarrow. The problem is that I really cant find how to write a pandas dataframe to hdfs.

My pandas version: 0.19.0

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.

pyarrow.Table.fromPandas is the function your looking for:

Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

The result can be written directly to Parquet / HDFS without passing data via Spark:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

See also

Spark notes:

Furthermore since Spark 2.3 (current master) Arrow is supported directly in createDataFrame (SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame). It uses SparkContext.defaultParallelism to compute number of chunks so you can easily control the size of individual batches.

Finally defaultParallelism can be used to control number of partitions generated using standard _convert_from_pandas, effectively reducing size of the slices to something more manageable.

Unfortunately these are unlikely to resolve your current memory problems. Both depend on parallelize, therefore store all data in memory of the driver node. Switching to Arrow or adjusting configuration can only speedup the process or address block size limitations.

In practice I don't see any reason to switch to Spark here, as long as you use local Pandas DataFrame as the input. The most severe bottleneck in this scenario is driver's network I/O and distributing data won't address that.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...