Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
262 views
in Technique[技术] by (71.8m points)

Spark / Scala: forward fill with last observation

Using Spark 1.4.0, Scala 2.10

I've been trying to figure out a way to forward fill null values with the last known observation, but I don't see an easy way. I would think this is a pretty common thing to do, but can't find an example showing how to do this.

I see functions to forward fill NaN with a value, or lag / lead functions to fill or shift data by an offset, but nothing to pick up the last known value.

Looking online, I see lots of Q/A about the same thing in R, but not in Spark / Scala.

I was thinking about mapping over a date range, filter the NaNs out of the results and pick the last element but I guess I'm confused about the syntax.

Using DataFrames I try something like

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

but that doesn't get me anywhere.

The filter part doesn't work; the map function returns a Sequence of spark.sql.Columns, but the filter function expects to return a Boolean, so I need to get a value out of the Column to test on but there only seem to be Column methods that return a Column.

Is there any way to do this more 'simply' on Spark?

Thanks for your input

Edit:

Simple example sample input:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

Expected output:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

Note:

  1. I have many columns, many of which have this missing data pattern, but not at the same date/time. If I need to I will do the transform one column at a time.

EDIT:

Following @zero323 's answer I tried this way:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

the broadcast variable ends up as a list of values without nulls. That's progress but I still can't get the mapping to work. but i get nothing, because the index i in the doesn't map to the original data, it maps to the subset without null.

What am I missing here?

EDIT and solution (as infered from @zero323 's answer):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

See zero323's answer below for more options if you're using RDDs instead of DataFrames. The solution above may not be the most efficient but works for me. If you're looking to optimize, check out the RDD solution.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Initial answer (a single time series assumption):

First of all try avoid window functions if you cannot provide PARTITION BY clause. It moves data to a single partition so most of the time it is simply not feasible.

What you can do is to fill gaps on RDD using mapPartitionsWithIndex. Since you didn't provide an example data or expected output consider this to be pseudocode not a real Scala program:

  • first lets order DataFrame by date and convert to RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • next lets find the last not null observation per partition

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • and convert this Map to broadcast

    val toCarryBd = sc.broadcast(toCarry)
    
  • finally map over partitions once again filling the gaps:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • finally convert back to DataFrame

Edit (partitioned / time series per group data):

The devil is in the detail. If your data is partitioned after all then a whole problem can be solved using groupBy. Lets assume you simply partition by column "v" of type T and Date is an integer timestamp:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

This way you can fill all columns at the same time.

Can this be done with DataFrames instead of converting back and forth to RDD?

It depends, although it is unlikely to be efficient. If maximum gap is relatively small you can do something like this:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

It can be easily adjusted to use different maximum gap per column.

A simpler way to achieve a similar result in the latest Spark version is to use last with ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

While it is possible to drop partitionBy clause and apply this method globally, it would prohibitively expensive with large datasets.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...