The answer to both questions is NO:
DataFrames are driver-side abstractions of distributed collections. They cannot be used, created, or referenced in any executor-side transformation.
Why?
DataFrames (like RDDs and Datasets) can only be used within the context of an active SparkSession
- without it, the DataFrame cannot "point" to its partitions on the active executors; The SparkSession
should be thought of as a live "connection" to the cluster of executors.
Now, if you try using a DataFrame inside another transformation, that DataFrame would have to be serialized on the driver side, sent to the executor(s), and then deserialized there. But this deserialized instance (in a separate JVM) would necessarily lose it's SparkSession
- that "connection" was from the driver to the executor, not from this new executor we're now operating in.
So what should you do?
You have a few options for referencing one DataFrame's data in another, and choosing the right one is mostly dependent on the amounts of data that would have to be shuffled (or - transferred between executors):
Collect one of the DataFrames (if you can guarantee it's small!), and then use the resulting local collection (either directly or using spark.broadcast
) in any transformation.
Join the two DataFrames on some common fields. This is a very common solution, as the logic of using one DataFrame's data when transforming another usually has to do with some kind of "lookup" for the right value based on some subset of the columns. This usecase translates into a JOIN operation rather naturally
Use set operators like except
, intersect
and union
, if they provide the logical operation you're after.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…