Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
284 views
in Technique[技术] by (71.8m points)

scala - Why am I getting this timeout during unit test of akka-stream?

I have an akka-gRPC service BiDirectional stream and I am testing it on a unit test. The service has uses akka-stream and I use the TestSink.probe to test the reply message. I am receiving back the messages from the service, but there is an error related to timeout that I cannot figure out what is the reason. This is the test:

object GreeterServiceConf {
  // important to enable HTTP/2 in server ActorSystem's config
  val configServer = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
    .withFallback(ConfigFactory.defaultApplication())

  val configString2 =
    """
      |akka.grpc.client {
      |  "helloworld.GreeterService" {
      |    host = 127.0.0.1
      |    port = 8080
      |  }
      |}
      |""".stripMargin
  val configClient = ConfigFactory.parseString(configString2)
}

class GreeterServiceImplSpec extends TestKit(ActorSystem("GreeterServiceImplSpec", ConfigFactory.load(GreeterServiceConf.configServer)))
  with AnyWordSpecLike
  with BeforeAndAfterAll
  with Matchers
  with ScalaFutures {

  implicit val patience: PatienceConfig = PatienceConfig(scaled(5.seconds), scaled(100.millis))

  // val testKit = ActorTestKit(conf)
  val serverSystem: ActorSystem = system
  val bound = new GreeterServer(serverSystem).run()

  // make sure server is bound before using client
  bound.futureValue

  implicit val clientSystem: ActorSystem = ActorSystem("GreeterClient", ConfigFactory.load(GreeterServiceConf.configClient))

  val client = GreeterServiceClient(
    GrpcClientSettings
      .fromConfig("helloworld.GreeterService")
      .withTls(false)
  )

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
    TestKit.shutdownActorSystem(clientSystem)
  }

  "GreeterService" should {
    "reply to multiple requests" in {
      import GreeterServiceData._

      val names = List("John", "Michael", "Simone")
      val expectedReply: immutable.Seq[HelloReply] = names.map { name =>
        HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
      }

      val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
      val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)
      val sink = TestSink.probe[HelloReply]
      val replyStream = responseStream.runWith(sink)
      replyStream
        .requestNext(HelloReply(s"Hello, John -> I killed Java"))
        .requestNext(HelloReply(s"Hello, Michael -> We are the Jacksons 5"))
        .requestNext(HelloReply(s"Hello, Simone -> I have found a job to work with Scala =)")) // THIS IS THE LINE 122 ON THE ERROR
        // .request(3)
        // .expectNextUnorderedN(expectedReply) // I also tested this but it did not work
        .expectComplete()
    }
  }
}

The error is:

assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete at scala.Predef$.assert(Predef.scala:223) at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:459) at akka.testkit.TestKitBase.expectMsg(TestKit.scala:436) at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:436) at akka.testkit.TestKit.expectMsg(TestKit.scala:969) at akka.stream.testkit.TestSubscriber$ManualProbe.expectComplete(StreamTestKit.scala:479) at com.example.helloworld.GreeterServiceImplSpec.$anonfun$new$5(GreeterServiceImplSpec.scala:121)

question from:https://stackoverflow.com/questions/65942025/why-am-i-getting-this-timeout-during-unit-test-of-akka-stream

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I got it to work based on the project akka-grpc-quickstart-scala.g8. I am executing runForeach to run the graph and have a materialized Sink on the response stream. Then, when the response is done I am doing an assert inside the Future[Done].

    "reply to multiple requests" in {
      import GreeterServiceData._
      import system.dispatcher

      val names = List("John", "Martin", "Michael", "UnknownPerson")
      val expectedReplySeq: immutable.Seq[HelloReply] = names.map { name =>
        HelloReply(s"Hello, $name -> ${mapHelloReply.getOrElse(name, "this person does not exist =(")}")
      }
      // println(s"expectedReplySeq: ${expectedReplySeq.foreach(println)}")

      val requestStream: Source[HelloRequest, NotUsed] = Source(names).map(name => HelloRequest(name))
      val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream)

      val done: Future[Done] = responseStream.runForeach { reply: HelloReply =>
        // println(s"got streaming reply: ${reply.message}")
        assert(expectedReplySeq.contains(reply))
      }
      // OR USING Sink.foreach[HelloReply])(Keep.right)
      val sinkHelloReply = Sink.foreach[HelloReply] { e =>
        println(s"element: $e")
        assert(expectedReplySeq.contains(e))
      }
      responseStream.toMat(sinkHelloReply)(Keep.right).run().onComplete {
        case Success(value) => println(s"done")
        case Failure(exception) => println(s"exception $exception")
      }
    }

Just to keep the reference of the whole code, the GreeterServiceImplSpec class is here.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...