Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
218 views
in Technique[技术] by (71.8m points)

scala - Apache beam stops to process PubSub messages after some time

I'm trying to write a simple Apache Beam pipeline (which will run on the Dataflow runner) to do the following:

  • Read PubSub messages containing file paths on GCS from a subscription.
  • For each message, read the data contained in the file associated with the message (the files can be of a variery of formats (csv, jsonl, json, xml, ...)).
  • Do some processing on each record.
  • Write back the result on GCS.

I'm using a 10 seconds fixed window on the messages. Since incoming files are already chunked (max size of 10MB) I decided not to use splittable do functions to read the files, in order to avoid adding useless complexity (especially for files that are not trivially splittable in chunks).

Here is a simplified code sample that gives the exact same problem of the full one:

package skytv.ingester

import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets

import kantan.csv.rfc
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.{Compression, FileIO, FileSystems, TextIO, WriteFilesResult}
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubIO, PubsubMessage}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, FixedWindows, PaneInfo, Window}
import org.apache.beam.sdk.transforms.{Contextful, DoFn, MapElements, PTransform, ParDo, SerializableFunction, SimpleFunction, WithTimestamps}
import org.apache.beam.sdk.values.{KV, PCollection}
import org.joda.time.{Duration, Instant}
import skytv.cloudstorage.CloudStorageClient
import skytv.common.Closeable
import kantan.csv.ops._
import org.apache.beam.sdk.io.FileIO.{Sink, Write}

class FileReader extends DoFn[String, List[String]] {

  private def getFileReader(filePath: String) = {
    val cloudStorageClient = new CloudStorageClient()
    val inputStream = cloudStorageClient.getInputStream(filePath)
    val isr = new InputStreamReader(inputStream, StandardCharsets.UTF_8)
    new BufferedReader(isr)
  }

  private def getRowsIterator(fileReader: BufferedReader) = {
    fileReader
      .asUnsafeCsvReader[Seq[String]](rfc
        .withCellSeparator(',')
        .withoutHeader
        .withQuote('"'))
      .toIterator
  }

  @ProcessElement
  def processElement(c: ProcessContext): Unit = {
    val filePath = c.element()

    Closeable.tryWithResources(
      getFileReader(filePath)
    ) {
      fileReader => {

        getRowsIterator(fileReader)
          .foreach(record => c.output(record.toList))

      }
    }


  }
}

class DataWriter(tempFolder: String) extends PTransform[PCollection[List[String]], WriteFilesResult[String]] {

  private val convertRecord = Contextful.fn[List[String], String]((dr: List[String]) => {
    dr.mkString(",")
  })

  private val getSink = Contextful.fn[String, Sink[String]]((destinationKey: String) => {
    TextIO.sink()
  })

  private val getPartitioningKey = new SerializableFunction[List[String], String] {
    override def apply(input: List[String]): String = {
      input.head
    }
  }

  private val getNaming = Contextful.fn[String, Write.FileNaming]((destinationKey: String) => {
    new Write.FileNaming {
      override def getFilename(
                                window: BoundedWindow,
                                pane: PaneInfo,
                                numShards: Int,
                                shardIndex: Int,
                                compression: Compression
                              ): String = {
        s"$destinationKey-${window.maxTimestamp()}-${pane.getIndex}.csv"
      }
    }
  })

  override def expand(input: PCollection[List[String]]): WriteFilesResult[String] = {
    val fileWritingTransform = FileIO
      .writeDynamic[String, List[String]]()
      .by(getPartitioningKey)
      .withDestinationCoder(input.getPipeline.getCoderRegistry.getCoder(classOf[String]))
      .withTempDirectory(tempFolder)
      .via(convertRecord, getSink)
      .withNaming(getNaming)
      .withNumShards(1)

    input
      .apply("WriteToAvro", fileWritingTransform)
  }

}

object EnhancedIngesterScalaSimplified {

  private val SUBSCRIPTION_NAME = "projects/<project>/subscriptions/<subscription>"
  private val TMP_LOCATION = "gs://<path>"

  private val WINDOW_SIZE = Duration.standardSeconds(10)

  def main(args: Array[String]): Unit = {

    val options = PipelineOptionsFactory.fromArgs(args: _*).withValidation().create()

    FileSystems.setDefaultPipelineOptions(options)

    val p = Pipeline.create(options)

    val messages = p
      .apply("ReadMessages", PubsubIO.readMessagesWithAttributes.fromSubscription(SUBSCRIPTION_NAME))
//      .apply("AddProcessingTimeTimestamp", WithTimestamps.of(new SerializableFunction[PubsubMessage, Instant] {
//        override def apply(input: PubsubMessage): Instant = Instant.now()
//      }))

    val parsedMessages = messages
      .apply("ApplyWindow", Window.into[PubsubMessage](FixedWindows.of(WINDOW_SIZE)))
      .apply("ParseMessages", MapElements.via(new SimpleFunction[PubsubMessage, String]() {
        override def apply(msg: PubsubMessage): String = new String(msg.getPayload, StandardCharsets.UTF_8)
      }))

    val dataReadResult = parsedMessages
      .apply("ReadData", ParDo.of(new FileReader))

    val writerResult = dataReadResult.apply(
      "WriteData",
      new DataWriter(TMP_LOCATION)
    )

    writerResult.getPerDestinationOutputFilenames.apply(
      "FilesWritten",
      MapElements.via(new SimpleFunction[KV[String, String], String]() {
      override def apply(input: KV[String, String]): String = {
        println(s"Written ${input.getKey}, ${input.getValue}")
        input.getValue
      }
    }))

    p.run.waitUntilFinish()
  }
}

The problem is that after the processing of some messages (in the order of 1000), the job stops processing new messages and they remain in the PubSub subscription unacknowledged forever.

I saw that in such a situation the watermark stops to advance and the data freshness linearly increases indefinitely.

Here is a screenshot from dataflow: The dataflow behavior on the read step

And here the situation on the PubSub queue: The PubSub message queue state

Is it possible that there are some messages that remain stuck in the dataflow queues filling them so that no new messages can be added?

I thought that there was some problem on how timestamps are computed by the PubsubIO, so I tried to force the timestamps to be equal to the processing time of each message, but I had no success.

If I leave the dataflow job in this state, it seems that it continuously reprocesses the same messages without writing any data to storage.

Do you have any idea on how to solve this problem?

Thanks!

question from:https://stackoverflow.com/questions/65883170/apache-beam-stops-to-process-pubsub-messages-after-some-time

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Most likely the pipeline has encountered an error while processing one(or more) elements in the pipeline (and it shouldn't have anything to do with how timestamps are computed by the PubsubIO), which stops the watermark from advancing since the failed work will be retried again and again on dataflow.

You can check if there's any failure from the log, specifically from worker or harness component. If there's an unhandled runtime exception such as parse error etc, it is very likely being the root cause of a streaming pipeline getting stuck.

If there's no UserCodeException then it is likely some other issue caused by dataflow backend and you can reach out to Dataflow customer support so engineers can look into the backend issue for your pipeline.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...