It cannot be done directly. Column
is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize
and join
by index:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}
val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)
val rows = df.rdd.zipWithIndex.map(_.swap)
.join(sc.parallelize(aList).zipWithIndex.map(_.swap))
.values
.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))
Another similar approach is to index and use and UDF to handle the rest:
import scala.util.Try
val indexedDf = sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
},
df.schema.add("idx_", "long")
)
def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)
indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))
Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.
While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map
/ mapPartitions
.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…