Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
155 views
in Technique[技术] by (71.8m points)

asynchronous - Calculate the time stream takes to complete in akka stream with and without async

I want to calculate the time akka stream takes to complete object Demo extends App {

  implicit val system       = ActorSystem("MyDemo")
  implicit val materializer = ActorMaterializer()
  val startTime = System.currentTimeMillis


  System.out.println(elapsedTime)
  val flowA = Flow[String].map { element ?
    println(s"Flow A : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
     element
  }

  val flowB = Flow[String].map { element ?
    println(s"Flow B : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  val flowC = Flow[String].map { element ?
    println(s"Flow C : $element ${Thread.currentThread().getName()}" )
    Thread.sleep(1000)
    element
  }

  import system.dispatcher
  val completion = Source(List("Java", "Scala", "C++"))
    .via(flowA)
    .via(flowB)
    .via(flowC)
    .runWith(Sink.foreach(s ? println("Got output " + s)))
  val stopTime = System.currentTimeMillis
  val elapsedTime = stopTime - startTime
  println(elapsedTime)
  completion.onComplete(_ => system.terminate())

Output

 0
113
Flow A : Java MyDemo-akka.actor.default-dispatcher-4
Flow B : Java MyDemo-akka.actor.default-dispatcher-4
Flow C : Java MyDemo-akka.actor.default-dispatcher-4
Got output Java
Flow A : Scala MyDemo-akka.actor.default-dispatcher-4
Flow B : Scala MyDemo-akka.actor.default-dispatcher-4
Flow C : Scala MyDemo-akka.actor.default-dispatcher-4
Got output Scala
Flow A : C++ MyDemo-akka.actor.default-dispatcher-4
Flow B : C++ MyDemo-akka.actor.default-dispatcher-4
Flow C : C++ MyDemo-akka.actor.default-dispatcher-4
Got output C++

Queries

  1. the elapsed time 113 gets printed before streams completes, not clear with the reason. I want to print the elapsed time after stream completes the processing
  2. how can we calculate the time taken to complete stream processing as as I want to compare results of time taken using .map versus replacing .map with .async
question from:https://stackoverflow.com/questions/65626819/calculate-the-time-stream-takes-to-complete-in-akka-stream-with-and-without-asyn

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Running a stream is asynchronous. For

val completion =
   // omitted for brevity
   .runWith(Sink.foreach(s => println(s"Got output $s")))

completion is a Future[Done] (the materialized value of Sink.foreach) that will be completed with Done (a singleton) when the stream successfully completes (the future will be failed if the stream fails). Effectively that line of code is complete and execution moves on once the stream has been materialized and started.

You can get an upper-bound on the time taken by simply moving the code to calculate the elapsed time into an onComplete callback on completion.

completion.onComplete { _ =>  // there's only one possible value here, so we don't need it
  val stopTime = System.currentTimeMillis()
  val elapsedTime = stopTime - startTime
  println(elapsedTime)
  system.terminate()
}

Note that this callback will execute at some point after the stream completes, but there are no guarantees that it will immediately be executed. That said, as long as the system and JVM you're running this on aren't under a heavy load, it's good enough.

Two other things are worth noting:

  • currentTimeMillis really shouldn't be used for reliable benchmarking: it's not even guaranteed to be monotonic (it can go backwards). System.nanoTime is generally more reliable for this purpose.
  • It may be more realistic to take the start time right before val completion = ???, as otherwise you're also measuring time to construct the "blueprint" of the stream, not just the time to materialize and run the stream.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...