I'm trying to write a simple Apache Beam pipeline (which will run on the Dataflow runner) to do the following:
- Read PubSub messages containing file paths on GCS from a subscription.
- For each message, read the data contained in the file associated with the message (the files can be of a variery of formats (csv, jsonl, json, xml, ...)).
- Do some processing on each record.
- Write back the result on GCS.
I'm using a 10 seconds fixed window on the messages. Since incoming files are already chunked (max size of 10MB) I decided not to use splittable do functions to read the files, in order to avoid adding useless complexity (especially for files that are not trivially splittable in chunks).
Here is a simplified code sample that gives the exact same problem of the full one:
package skytv.ingester
import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets
import kantan.csv.rfc
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.{Compression, FileIO, FileSystems, TextIO, WriteFilesResult}
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubIO, PubsubMessage}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, FixedWindows, PaneInfo, Window}
import org.apache.beam.sdk.transforms.{Contextful, DoFn, MapElements, PTransform, ParDo, SerializableFunction, SimpleFunction, WithTimestamps}
import org.apache.beam.sdk.values.{KV, PCollection}
import org.joda.time.{Duration, Instant}
import skytv.cloudstorage.CloudStorageClient
import skytv.common.Closeable
import kantan.csv.ops._
import org.apache.beam.sdk.io.FileIO.{Sink, Write}
class FileReader extends DoFn[String, List[String]] {
private def getFileReader(filePath: String) = {
val cloudStorageClient = new CloudStorageClient()
val inputStream = cloudStorageClient.getInputStream(filePath)
val isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8)
new BufferedReader(isr)
}
private def getRowsIterator(fileReader: BufferedReader) = {
fileReader
.asUnsafeCsvReader[Seq[String]](rfc
.withCellSeparator(',')
.withoutHeader
.withQuote('"'))
.toIterator
}
@ProcessElement
def processElement(c: ProcessContext): Unit = {
val filePath = c.element()
Closeable.tryWithResources(
getFileReader(filePath)
) {
fileReader => {
getRowsIterator(fileReader)
.foreach(record => c.output(record.toList))
}
}
}
}
class DataWriter(tempFolder: String) extends PTransform[PCollection[List[String]], WriteFilesResult[String]] {
private val convertRecord = Contextful.fn[List[String], String]((dr: List[String]) => {
dr.mkString(",")
})
private val getSink = Contextful.fn[String, Sink[String]]((destinationKey: String) => {
TextIO.sink()
})
private val getPartitioningKey = new SerializableFunction[List[String], String] {
override def apply(input: List[String]): String = {
input.head
}
}
private val getNaming = Contextful.fn[String, Write.FileNaming]((destinationKey: String) => {
new Write.FileNaming {
override def getFilename(
window: BoundedWindow,
pane: PaneInfo,
numShards: Int,
shardIndex: Int,
compression: Compression
): String = {
s"$destinationKey-${window.maxTimestamp()}-${pane.getIndex}.csv"
}
}
})
override def expand(input: PCollection[List[String]]): WriteFilesResult[String] = {
val fileWritingTransform = FileIO
.writeDynamic[String, List[String]]()
.by(getPartitioningKey)
.withDestinationCoder(input.getPipeline.getCoderRegistry.getCoder(classOf[String]))
.withTempDirectory(tempFolder)
.via(convertRecord, getSink)
.withNaming(getNaming)
.withNumShards(1)
input
.apply("WriteToAvro", fileWritingTransform)
}
}
object EnhancedIngesterScalaSimplified {
private val SUBSCRIPTION_NAME = "projects/<project>/subscriptions/<subscription>"
private val TMP_LOCATION = "gs://<path>"
private val WINDOW_SIZE = Duration.standardSeconds(10)
def main(args: Array[String]): Unit = {
val options = PipelineOptionsFactory.fromArgs(args: _*).withValidation().create()
FileSystems.setDefaultPipelineOptions(options)
val p = Pipeline.create(options)
val messages = p
.apply("ReadMessages", PubsubIO.readMessagesWithAttributes.fromSubscription(SUBSCRIPTION_NAME))
// .apply("AddProcessingTimeTimestamp", WithTimestamps.of(new SerializableFunction[PubsubMessage, Instant] {
// override def apply(input: PubsubMessage): Instant = Instant.now()
// }))
val parsedMessages = messages
.apply("ApplyWindow", Window.into[PubsubMessage](FixedWindows.of(WINDOW_SIZE)))
.apply("ParseMessages", MapElements.via(new SimpleFunction[PubsubMessage, String]() {
override def apply(msg: PubsubMessage): String = new String(msg.getPayload, StandardCharsets.UTF_8)
}))
val dataReadResult = parsedMessages
.apply("ReadData", ParDo.of(new FileReader))
val writerResult = dataReadResult.apply(
"WriteData",
new DataWriter(TMP_LOCATION)
)
writerResult.getPerDestinationOutputFilenames.apply(
"FilesWritten",
MapElements.via(new SimpleFunction[KV[String, String], String]() {
override def apply(input: KV[String, String]): String = {
println(s"Written ${input.getKey}, ${input.getValue}")
input.getValue
}
}))
p.run.waitUntilFinish()
}
}
The problem is that after the processing of some messages (in the order of 1000), the job stops processing new messages and they remain in the PubSub subscription unacknowledged forever.
I saw that in such a situation the watermark stops to advance and the data freshness linearly increases indefinitely.
Here is a screenshot from dataflow:
And here the situation on the PubSub queue:
Is it possible that there are some messages that remain stuck in the dataflow queues filling them so that no new messages can be added?
I thought that there was some problem on how timestamps are computed by the PubsubIO, so I tried to force the timestamps to be equal to the processing time of each message, but I had no success.
If I leave the dataflow job in this state, it seems that it continuously reprocesses the same messages without writing any data to storage.
Do you have any idea on how to solve this problem?
Thanks!
question from:
https://stackoverflow.com/questions/65883170/apache-beam-stops-to-process-pubsub-messages-after-some-time