Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
773 views
in Technique[技术] by (71.8m points)

scala - Coalesce reduces parallelism of entire stage (spark)

Sometimes Spark "optimizes" a dataframe plan in an inefficient way. Consider the following example in Spark 2.1 (can also be reproduced in Spark 1.6):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

In this example I want to write 1 file after an expensive transformation of a dataframe (this is just an example to demonstrate the issue). Spark moves the coalesce(1) up such that the UDF is only applied to a dataframe containing 1 partition, thus destroying parallelism (interestingly repartition(1) does not behave this way).

To generalize, this behavior occurs when I want to increase parallelism in a certain part of my transformation, but decrease parallelism thereafter.

I've found one workaround which consists of caching the dataframe and then triggering the complete evaluation of the dataframe:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

My question is: is there another way to tell Spark not to decrease parallelism in such cases?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Actually it is not because of SparkSQL's optimization, SparkSQL doesn't change the position of Coalesce operator, as the executed plan shows:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

I quote a paragraph from coalesce API's description:

Note: This paragraph is added by the jira SPARK-19399. So it should not be found in 2.0's API.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The coalesce API doesn't perform a shuffle, but results in a narrow dependency between previous RDD and current RDD. As RDD is lazy evaluation, the computation is actually done with coalesced partitions.

To prevent it, you should use repartition API.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...