Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

scala - joda DateTime format cause null pointer error in spark RDD functions

The exception message as following

User class threw exception: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) at org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) at org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) at org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)

My code as following:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }




object DateTimeNullReferenceReappear extends App {

  case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

  val cfg = new Configuration
  val sparkConf = new SparkConf()
  sparkConf.setAppName("bourne_exception_reappear")
  val sc = new SparkContext(sparkConf)

val data = TDWSparkContext.tdwTable(   // this function just read data from an data warehouse
  sc,
  tdwuser = FaceConf.TDW_USER,
  tdwpasswd = FaceConf.TDW_PASSWORD,
  dbName = "my_db",
  tblName = "my_table",
  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
  .map(row => {
    Record(uin = row(2),
      date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
      value = row(4).toDouble)
  }).map(x => (x.uin, (x.date, x.value)))
  .groupByKey
  .map(x => {
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
  })

//      val data = TDWSparkContext.tdwTable(  // It works, as I don't user datetime toString in the groupBy 
//      sc,
//      tdwuser = FaceConf.TDW_USER,
//      tdwpasswd = FaceConf.TDW_PASSWORD,
//      dbName = "hy",
//      tblName = "t_dw_cf_oss_tblogin",
//      parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
//      .map(row => {
//        Record(uin = row(2),
//          date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
//          value = row(4).toDouble)
//      }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
//      .groupByKey
//      .map(x => {
//        x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
//      })

  data.take(10).map(println)

}

So, it seems that call toString in the groupBy cause the exception, so can anybody explain it?

Thanks

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You need to either disable Kryo, use Kryo JodaTime Serializers, or avoid serializing the DateTime object, i.e. pass around Longs.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...