Problem
I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.
Versions
- Java 8
- Scala 2.10.6
- Apache Spark 1.6.0 Pre-built for Hadoop 2.6.0
What I've Tried That Works
I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
sqlContext.udf().register("udfUppercase",
(String string) -> string.toUpperCase(), DataTypes.StringType);
DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");
Where I'm Stuck
I would expect a non-SQL method-call-style UDF in Java to look something like this:
import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
UserDefinedFunction udfUppercase = udf(
(String string) -> string.toUpperCase(), DataTypes.StringType);
DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));
Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:
error: no suitable method found for udf((String st[...]ase(),DataType)
UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
(cannot infer type-variable(s) RT#1
(argument mismatch; Function0 is not a functional interface
multiple non-overriding abstract methods found in interface Function0))
This error continues with detail for each of the inferred udf() signatures attempted.
What I Need
I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.
Working Solution (courtesy of zero323 below)
There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
sqlContext.udf().register("udfUppercase",
(String string) -> string.toUpperCase(), DataTypes.StringType);
DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));
Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.
See Question&Answers more detail:
os