First of all, i want show my exception
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:77)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:293)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:830)
When I tried to set TimeCharacteristic
to EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I found:
* @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
* TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
* event-time support anymore. Explicitly using processing-time windows and timers works in
* event-time mode. If you need to disable watermarks, please use {@link
* ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
* TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
* WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
* org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
* that change behaviour based on the time characteristic, please use equivalent operations
* that explicitly specify processing time or event time
and i did not forget to call DataStream.assignTimestampsAndWatermarks(...)
DataStream<SensorReading> dataStream = inputStream
.map(data -> {
String[] fields = data.split(",");
return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
})
.assignTimestampsAndWatermarks(new MyWaterMark());
private static class MyWaterMark implements WatermarkStrategy<SensorReading> {
@Override
public WatermarkGenerator<SensorReading> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<SensorReading>() {
private long bound = 3000L;
private long maxTimestamp = 0;
@Override
public void onEvent(SensorReading event, long eventTimestamp,
WatermarkOutput output) {
maxTimestamp = Math.max(eventTimestamp, event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp == Long.MIN_VALUE ? 0 : (maxTimestamp - bound)));
}
};
}
what should i do?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…