when should I do dataframe.cache() and when it's usefull?
cache
what you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.
You can see if a DataFrame was cached in your physical plan using explain
operator (where InMemoryRelation
entities reflect cached datasets with their storage level):
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
After you cache
(or persist
) your DataFrame the first query may get slower, but it is going to pay off for the following queries.
You can check whether a Dataset was cached or not using the following code:
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false
Also, in my code should I cache the dataframes in the commented lines?
Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.
Don't cache what you use only once or is easy to compute. Otherwise, cache
.
Be careful what you cache, i.e. what Dataset
is cached, as it gives different queries cached.
// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 5, step=1, splits=8)
// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
+- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Filter ((id#17L % 2) = 0)
+- *Range (0, 1, step=1, splits=8)
There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.