The answer for this one was tricky, but thanks to samklr, I have managed to figure about what the problem was.
The solution isn't straightforward nevertheless and might consider some “unnecessary” transformations.
First let's talk about Serialization.
There are two aspects of serialization to consider in Spark serialization of data and serialization of functions. In this case, it's about data serialization and thus de-serialization.
From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables
. As such, InputFormat
and OutputFormats
are required to return Writables
which, out of the box, Spark does not understand.
With the elasticsearch-spark connector one must enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
Even since Kryo does not require that a class implement a particular interface to be serialized, which means POJOs can be used in RDDs without any further work beyond enabling Kryo serialization.
That said, @samklr pointed out to me that Kryo needs to register classes before using them.
This is because Kryo writes a reference to the class of the object being serialized (one reference is written for every object written), which is just an integer identifier if the class has been registered but is the full classname otherwise. Spark registers Scala classes and many other framework classes (like Avro Generic or Thrift classes) on your behalf.
Registering classes with Kryo is straightforward. Create a subclass of KryoRegistrator,and override the registerClasses()
method:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
Finally, in your driver program, set the spark.kryo.registrator property to the fully qualified classname of your KryoRegistrator implementation:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
Secondly, even thought the Kryo serializer is set and the class registered, with changes made to Spark 1.5, and for some reason Elasticsearch couldn't de-serialize the Dataframe because it can't infer the SchemaType
of the Dataframe into the connector.
So I had to convert the Dataframe to an JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
Now the data is ready to be written into elasticsearch :
JavaEsSpark.saveToEs(products, "test/test");
References:
- Elasticsearch's Apache Spark support documentation.
- Hadoop Definitive Guide, Chapter 19. Spark, ed. 4 – Tom White.
- User samklr.