Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
900 views
in Technique[技术] by (71.8m points)

scala - Spark : How to use mapPartition and create/close connection per partition

So, I want to do certain operations on my spark DataFrame, write them to DB and create another DataFrame at the end. It looks like this :

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()

This gives me an error as mapPartitions expects return type of Iterator[NotInferedR], but here it is Unit. I know this is possible with forEachPartition, but I'd like to do the mapping also. Doing it separate would be an overhead (extra spark job). What to do?

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

On most cases, eager consuming the iterator will result to execution failure if not slow down of jobs. Thus what I've done was to check if iterator is already empty then do the cleanup routines.

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})

Thought this as a spark problem at first but was a scala one actually. http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...