Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
368 views
in Technique[技术] by (71.8m points)

scala - How to split a dataframe into dataframes with same column values?

Using Scala, how can I split dataFrame into multiple dataFrame (be it array or collection) with same column value. For example I want to split the following DataFrame:

ID  Rate    State
1   24  AL
2   35  MN
3   46  FL
4   34  AL
5   78  MN
6   99  FL

to:

data set 1

ID  Rate    State
1   24  AL  
4   34  AL

data set 2

ID  Rate    State
2   35  MN
5   78  MN

data set 3

ID  Rate    State
3   46  FL
6   99  FL
Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can collect unique state values and simply map over resulting array:

val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))

or to map:

val byStateMap = states
    .map(state => (state -> df.where($"State" <=> state)))
    .toMap

The same thing in Python:

from itertools import chain
from pyspark.sql.functions import col

states = chain(*df.select("state").distinct().collect())

# PySpark 2.3 and later
# In 2.2 and before col("state") == state) 
# should give the same outcome, ignoring NULLs 
# if NULLs are important 
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
df_by_state = {state: 
  df.where(col("state").eqNullSafe(state)) for state in states}

The obvious problem here is that it requires a full data scan for each level, so it is an expensive operation. If you're looking for a way to just split the output see also How do I split an RDD into two or more RDDs?

In particular you can write Dataset partitioned by the column of interest:

val path: String = ???
df.write.partitionBy("State").parquet(path)

and read back if needed:

// Depend on partition prunning
for { state <- states } yield spark.read.parquet(path).where($"State" === state)

// or explicitly read the partition
for { state <- states } yield spark.read.parquet(s"$path/State=$state")

Depending on the size of the data, number of levels of the splitting, storag and persistence level of the input it might faster or slower than multiple filters.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...