So what I would do is to add timestamps to each element according to the file path. As a test I used the following example.
First of all, as explained in this answer, you can use FileIO
to match continuously a file pattern. This will help as, per your use case, once you have finished with the backfill you want to keep reading new arriving files within the same job. In this case I provide gs://BUCKET_NAME/data/**
because my files will be like gs://BUCKET_NAME/data/year/month/day/hour/filename.extension
:
p
.apply(FileIO.match()
.filepattern(inputPath)
.continuously(
// Check for new files every minute
Duration.standardMinutes(1),
// Never stop checking for new files
Watch.Growth.<String>never()))
.apply(FileIO.readMatches())
Watch frequency and timeout can be adjusted at will.
Then, in the next step we'll receive the matched file. I will use ReadableFile.getMetadata().resourceId()
to get the full path and split it by "/"
to build the corresponding timestamp. I round it to the hour and do not account for timezone correction here. With readFullyAsUTF8String
we'll read the whole file (be careful if the whole file does not fit into memory, it is recommended to shard your input if needed) and split it into lines. With ProcessContext.outputWithTimestamp
we'll emit downstream a KV of filename and line (the filename is not needed anymore but it will help to see where each file comes from) and the timestamp derived from the path. Note that we're shifting timestamps "back in time" so this can mess up with the watermark heuristics and you will get a message such as:
Cannot output with timestamp 2019-03-17T00:00:00.000Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-05T15:41:29.645Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
To overcome this I set getAllowedTimestampSkew
to Long.MAX_VALUE
but take into account that this is deprecated. ParDo code:
.apply("Add Timestamps", ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@Override
public Duration getAllowedTimestampSkew() {
return new Duration(Long.MAX_VALUE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
String lines[];
String[] dateFields = fileName.split("/");
Integer numElements = dateFields.length;
String hour = dateFields[numElements - 2];
String day = dateFields[numElements - 3];
String month = dateFields[numElements - 4];
String year = dateFields[numElements - 5];
String ts = String.format("%s-%s-%s %s:00:00", year, month, day, hour);
Log.info(ts);
try{
lines = file.readFullyAsUTF8String().split("
");
for (String line : lines) {
c.outputWithTimestamp(KV.of(fileName, line), new Instant(dateTimeFormat.parseMillis(ts)));
}
}
catch(IOException e){
Log.info("failed");
}
}}))
Finally, I window into 1-hour FixedWindows
and log the results:
.apply(Window
.<KV<String,String>>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply("Log results", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
String file = c.element().getKey();
String value = c.element().getValue();
String eventTime = c.timestamp().toString();
String logString = String.format("File=%s, Line=%s, Event Time=%s, Window=%s", file, value, eventTime, window.toString());
Log.info(logString);
}
}));
For me it worked with .withAllowedLateness(Duration.ZERO)
but depending on the order you might need to set it. Keep in mind that a value too high will cause windows to be open for longer and use more persistent storage.
I set the $BUCKET
and $PROJECT
variables and I just upload two files:
gsutil cp file1 gs://$BUCKET/data/2019/03/17/00/
gsutil cp file2 gs://$BUCKET/data/2019/03/18/22/
And run the job with:
mvn -Pdataflow-runner compile -e exec:java
-Dexec.mainClass=com.dataflow.samples.ChronologicalOrder
-Dexec.args="--project=$PROJECT
--path=gs://$BUCKET/data/**
--stagingLocation=gs://$BUCKET/staging/
--runner=DataflowRunner"
Results:
Full code
Let me know how this works. This was just an example to get started and you might need to adjust windowing and triggering strategies, lateness, etc to suit your use case