It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.
So the original task was to append a column with row identificators (basically, a sequence 1 to numRows
) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Regarding the general case of appending any column to any data frame:
The "closest" to this functionality in Spark API are withColumn
and withColumnRenamed
. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on this
data frame only, i.e. given two data frames df1
and df2
with column col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumn
or withColumnRenamed
for appending arbitrary columns (standalone or other data frames).
As it was commented above, the workaround solution may be to use a join
- this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndex
to both data frames or columns might work. Although efficiency is ...
It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…