Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.0k views
in Technique[技术] by (71.8m points)

Using own jackson versions in Flink causes VerifyError

I'm using Apache Flink (v1.11) with Scala and added an own DeserializationSchema for Kafka connector. Therefore i would like to use my own packages and versions of jackson (v2.12.0).

But i got the following error:

Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
    at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
    at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
    at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)

This is my EventSchema:

import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.datatype.joda.JodaModule
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation

@SerialVersionUID(6154188370181669758L)
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {

  private val mapper = new CsvMapper
  mapper.registerModule(new JodaModule)

  val csvSchema = mapper
    .schemaFor(classOf[CovidEvent])
    .withLineSeparator(",")
    .withoutHeader()
  val  reader = mapper.readerWithSchemaFor(classOf[CovidEvent])

  def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()

  @throws[IOException]
  def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)


  def isEndOfStream(nextElement: CovidEvent) = false

  def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
}

This is my PoJo for schema:

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.time.DateTime;


@Data
@NoArgsConstructor
@AllArgsConstructor
public class CovidEvent {

    private long objectId;
    private int bundeslandId;
    private String bundesland;
    private String landkreis;
    private String altersgruppe;
    private String geschlecht;
    private int anzahlFall;
    private int anzahlTodesfall;
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
    private DateTime meldedatum;
    private int landkreisId;
    private String datenstand;
    private int neuerFall;
    private int neuerTodesfall;
    private String refDatum;
    private int neuGenesen;
    private int anzahlGenesen;
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private boolean istErkrankungsbeginn;
    private String altersGruppe2;

    public long getEventtime() {
        return meldedatum.getMillis();
    }

}

After some research I found out that the error is probably caused by different Jackson versions in the classpath.

I thought it would be possible to use own version of Jackson, because Flink shaded the own versions.

What am I doing wrong?

UPDATE: If i import the jackson classes from shaded flink package it is working

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper

But so i am dependent on the flink shaded jackson version.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It would work if the classloader of Flink are used. However, the way your setup works, you are just loading your user code in the system classloader while the whole DataStream application is created. I'm not going to much into more details (unless requested in a follow-up) and go for the solution:

Your DeserializationSchema should never initialize heavy resources during creation (this happens on client or job manager side), but only in open (which happens on task manager). So please move

private val mapper = new CsvMapper
  mapper.registerModule(new JodaModule)

into open.

It only works with the bundled version because - lucky for you - ObjectMapper implements Serializable but that is rarely the case for parsers and actually completely unnecessary if the deserializer is initialized correctly.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...