Running a stream is asynchronous. For
val completion =
// omitted for brevity
.runWith(Sink.foreach(s => println(s"Got output $s")))
completion
is a Future[Done]
(the materialized value of Sink.foreach
) that will be completed with Done
(a singleton) when the stream successfully completes (the future will be failed if the stream fails). Effectively that line of code is complete and execution moves on once the stream has been materialized and started.
You can get an upper-bound on the time taken by simply moving the code to calculate the elapsed time into an onComplete
callback on completion
.
completion.onComplete { _ => // there's only one possible value here, so we don't need it
val stopTime = System.currentTimeMillis()
val elapsedTime = stopTime - startTime
println(elapsedTime)
system.terminate()
}
Note that this callback will execute at some point after the stream completes, but there are no guarantees that it will immediately be executed. That said, as long as the system and JVM you're running this on aren't under a heavy load, it's good enough.
Two other things are worth noting:
currentTimeMillis
really shouldn't be used for reliable benchmarking: it's not even guaranteed to be monotonic (it can go backwards). System.nanoTime
is generally more reliable for this purpose.
- It may be more realistic to take the start time right before
val completion = ???
, as otherwise you're also measuring time to construct the "blueprint" of the stream, not just the time to materialize and run the stream.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…