Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
180 views
in Technique[技术] by (71.8m points)

How do I write a Dataset encoder to support mapping a function to a org.apache.spark.sql.Dataset[String] in Scala Spark

Moving from Spark 1.6 to Spark 2.2* has brought the error “error: Unable to find encoder for type stored in a 'Dataset'. Primitive types (Int, String, etc)” when trying to apply a method to a dataset returned from querying a parquet table. I have oversimplified my code to demonstrate the same error. The code queries a parquet file to return the following datatype: 'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' I apply a function to extract a string and integer , returning a string. Returning the following datatype: Array[String] Next, I need to perform extensive manipulations requiring a separate function. In this test function, I try to append a string producing the same error as my detailed example. I have tried some encoder examples and use of the ‘case’ but have not come up with a workable solution. Any suggestions/ examples would be appreciated

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

To distill the problem further, i believe this scenario (though I have not tried all possible solutions to) can be simplified further to the following code:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I have a solution at least to my simplified Problem (below). I will be testing more....

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 | val r = s + "hi"
 | return r
 | }
f: (s: String)String

scala> var test2 = test.rdd.map{ s => f(s) }
test2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:43

scala> test2.take(1)
res9: Array[String] = Array(just some wordshi)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...