Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
707 views
in Technique[技术] by (71.8m points)

scala - Spark DataFrames when udf functions do not accept large enough input variables

I am preparing a DataFrame with an id and a vector of my features to be used later for doing predictions. I do a groupBy on my dataframe, and in my groupBy I am merging couple of columns as lists into a new column:

def mergeFunction(...) // with 14 input variables

val myudffunction( mergeFunction ) // Spark doesn't support this

df.groupBy("id").agg(
   collect_list(df(...)) as ...
   ... // too many of these (something like 14 of them)
).withColumn("features_labels",
  myudffunction(
     col(...)
     , col(...) )
.select("id", "feature_labels")

This is how I am creating my feature vectors and their labels. It has been working for me so far but this is the first time that my feature vector with this method is getting bigger than number 10 which is what at maximum a udf function in Spark accepts.

I am not sure how else I can fix this? Is the size of udf inputs in Spark going to get bigger, am have I understood them incorrectly, or there is a better way?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

User defined functions are defined for up to 22 parameters. Only udf helper is define for at most 10 arguments. To handle functions with larger number of parameters you can use org.apache.spark.sql.UDFRegistration.

For example

val dummy = ((
  x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int, 
  x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int, 
  x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1)

van be registered:

import org.apache.spark.sql.expressions.UserDefinedFunction

val dummyUdf: UserDefinedFunction = spark.udf.register("dummy", dummy)

and use directly

val df = spark.range(1)
val exprs =  (0 to 21).map(_ => lit(1))

df.select(dummyUdf(exprs: _*))

or by name via callUdf

import org.apache.spark.sql.functions.callUDF

df.select(
  callUDF("dummy", exprs:  _*).alias("dummy")
)

or SQL expression:

df.selectExpr(s"""dummy(${Seq.fill(22)(1).mkString(",")})""")

You can also create an UserDefinedFunction object:

import org.apache.spark.sql.expressions.UserDefinedFunction

Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*))

In practice having a function with 22 arguments is not very useful and unless you want to use Scala reflection to generate these there are maintenance nightmare.

I would either consider using collections (array, map) or struct as an input or divide this into multiple modules. For example:

val aLongArray = array((0 to 256).map(_ => lit(1)): _*)

val udfWitharray = udf((xs: Seq[Int]) => 1)

Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy"))

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...