Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
972 views
in Technique[技术] by (71.8m points)

performance - Spark: Explicit caching can interfere with Catalyst optimizer's ability to optimize some queries?

I'm studying to take the data bricks to spark certification exam, and their practice exam ( please see > https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html ) requires us to accept this statement as true fact:

"Explicit caching can decrease application performance by interfering with the Catalyst optimizer's ability to optimize some queries"

I got this question wrong even though I have read up a lot on the catalyst and have a pretty good grasp of the details. So I wanted to shore up my knowledge of this topic and go to the source which explains the how's and why's behind this assertion.

Can anyone provide guidance about this? Specifically, why is this so? and how do we ensure that when we cache our datasets we are not actually getting in the way of the optimizer and making things worse? /Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

How and why can a cache decrease the performances ?

Let's use a simple example to demonstrate that :

// Some data
val df = spark.range(100)

df.join(df, Seq("id")).filter('id <20).explain(true)

Here, the catalyst plan will optimize this join by doing a filter on each dataframe before joining, to reduce the amount of data that will get shuffled.

== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#69L)
   :- Filter (id#0L < 20)
   :  +- Range (0, 100, step=1, splits=Some(4))
   +- Filter (id#69L < 20)
      +- Range (0, 100, step=1, splits=Some(4))

If we cache the query after the join, the query won't be as optimized, as we can see here :

df.join(df, Seq("id")).cache.filter('id <20).explain(true)

== Optimized Logical Plan ==
Filter (id#0L < 20)
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *Project [id#0L]
         +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
            :- *Range (0, 100, step=1, splits=4)
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
               +- *Range (0, 100, step=1, splits=4)

The filter is done at the very end ...

Why so ? Because a cache writes on the disk the dataframe. So every consequent queries will use this cached / written on disk DataFrame, and so it will optimize only the part of the query AFTER the cache. We can check that with the same example !

df.join(df, Seq("id")).cache.join(df, Seq("id")).filter('id <20).explain(true)

== Optimized Logical Plan ==
Project [id#0L]
+- Join Inner, (id#0L = id#92L)
   :- Filter (id#0L < 20)
   :  +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :        +- *Project [id#0L]
   :           +- *BroadcastHashJoin [id#0L], [id#74L], Inner, BuildRight
   :              :- *Range (0, 100, step=1, splits=4)
   :              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   :                 +- *Range (0, 100, step=1, splits=4)
   +- Filter (id#92L < 20)
      +- Range (0, 100, step=1, splits=Some(4))

The filter is done before the second join, but after the first one because it is cached.

How to avoid ?

By knowing what you do ! You can simply compares catalyst plans and see what optimizations Spark is missing.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...