Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
712 views
in Technique[技术] by (71.8m points)

scala - Why Akka streams cycle doesn't end in this graph?

I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?

Thanks.

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The Stream is never completed because the merge never signals completion.

After formatting your graph structure, it basically looks like:

//ignoring the preferred which is inconsequential

fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
              merge <~ toRetry    <~ broadcastArray

The problem of non-completion is rooted in your merge step :

// 2 inputs into merge

fileSource ~> merge 
              merge <~ toRetry

Once the fileSource has emitted its single element (namely (0, Array.empty[String])) it sends out a complete message to merge.

However, the fileSource's completion message gets blocked at the merge. From the documentation:

akka.stream.scaladsl.MergePreferred

Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

The merge will not send out complete until all of its input streams have completed.

// fileSource is complete ~> merge 
//                           merge <~ toRetry is still running

// complete fileSource + still running toRetry = still running merge

Therefore, merge will wait until toRetry also completes. But toRetry will never complete because it is waiting for merge to complete.

If you want your specific graph to complete after fileSource completes then just set eagerClose=True which will cause merge to complete once fileSource completes. E.g.:

//Add this true                                             |
//                                                          V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")

Without the Stream Cycle

A simpler solution exists for your problem. Just use a single Flow.map stage which utilizes a tail recursive function:

//Note: there is no use of akka in this implementation

type FileInputType = (Int, Array[String])

@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType = 
  fileInput match { 
    case (r,_) if r >= 3  => fileInput
    case (r,a)            => recursiveRetry((r+1, a))
  }    

Your stream would then be reduced to

//ring-fenced akka code

val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry

fileSource ~> recursiveRetryFlow ~> toSink ~> sink

The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.

Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...