Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
339 views
in Technique[技术] by (71.8m points)

apache flink - flink1.12.0 how to set TimeCharacteristic

First of all, i want show my exception

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:77)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:293)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:830)

When I tried to set TimeCharacteristic to EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

I found:

 * @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
 *        TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
 *      event-time support anymore. Explicitly using processing-time windows and timers works in
 *      event-time mode. If you need to disable watermarks, please use {@link
 *        ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
 *        TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
 *        WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
 *        org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
 *      that change behaviour based on the time characteristic, please use equivalent operations
 *      that explicitly specify processing time or event time

and i did not forget to call DataStream.assignTimestampsAndWatermarks(...)

DataStream<SensorReading> dataStream = inputStream
            .map(data -> {
                String[] fields = data.split(",");
                return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
            })
            .assignTimestampsAndWatermarks(new MyWaterMark());

private static class MyWaterMark implements WatermarkStrategy<SensorReading> {

    @Override
    public WatermarkGenerator<SensorReading> createWatermarkGenerator(
            WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<SensorReading>() {

            private long bound = 3000L;

            private long maxTimestamp = 0;

            @Override
            public void onEvent(SensorReading event, long eventTimestamp,
                    WatermarkOutput output) {
                maxTimestamp = Math.max(eventTimestamp, event.getTimestamp());
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(new Watermark(maxTimestamp == Long.MIN_VALUE ? 0 : (maxTimestamp - bound)));
            }
        };
    }

what should i do?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...