SPARK-5063 relates to better error messages when trying to nest RDD operations, which is not supported.
It's a usability issue, not a functional one. The root cause is the nesting of RDD operations and the solution is to break that up.
Here we are trying a join of dRDD
and mRDD
. If the size of mRDD
is large, a rdd.join
would be the recommended way otherwise, if mRDD
is small, i.e. fits in memory of each executor, we could collect it, broadcast it and do a 'map-side' join.
JOIN
A simple join would go like this:
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}
If we would like to use broadcast, we first need to collect the value of the resolution table locally in order to b/c that to all executors. NOTE the RDD to be broadcasted MUST fit in the memory of the driver as well as of each executor.
Map-side JOIN with Broadcast variable
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…