Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
16.1k views
in Technique[技术] by (71.8m points)

scala - Creating a flow from actor in Akka Streams

It's possible to create sources and sinks from actors using Source.actorPublisher() and Sink.actorSubscriber() methods respectively. But is it possible to create a Flow from actor?

Conceptually there doesn't seem to be a good reason not to, given that it implements both ActorPublisher and ActorSubscriber traits, but unfortunately, the Flow object doesn't have any method for doing this. In this excellent blog post it's done in an earlier version of Akka Streams, so the question is if it's possible also in the latest (2.4.9) version.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I'm part of the Akka team and would like to use this question to clarify a few things about the raw Reactive Streams interfaces. I hope you'll find this useful.

Most notably, we'll be posting multiple posts on the Akka team blog about building custom stages, including Flows, soon, so keep an eye on it.

Don't use ActorPublisher / ActorSubscriber

Please don't use ActorPublisher and ActorSubscriber. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification. They're a relict of the past and even then were only "power-user mode only". There really is no reason to use those classes nowadays. We never provided a way to build a flow because the complexity is simply explosive if it was exposed as "raw" Actor API for you to implement and get all the rules implemented correctly.

If you really really want to implement raw ReactiveStreams interfaces, then please do use the Specification's TCK to verify your implementation is correct. You will likely be caught off guard by some of the more complex corner cases a Flow (or in RS terminology a Processor has to handle).

Most operations are possible to build without going low-level

Many flows you should be able to simply build by building from a Flow[T] and adding the needed operations onto it, just as an example:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

Which is a reusable description of the Flow.

Since you're asking about power user mode, this is the most powerful operator on the DSL itself: statefulFlatMapConcat. The vast majority of operations operating on plain stream elements is expressable using it: Flow.statefulMapConcat[T](f: () ? (Out) ? Iterable[T]): Repr[T].

If you need timers you could zip with a Source.timer etc.

GraphStage is the simplest and safest API to build custom stages

Instead, building Sources/Flows/Sinks has its own powerful and safe API: the GraphStage. Please read the documentation about building custom GraphStages (they can be a Sink/Source/Flow or even any arbitrary shape). It handles all of the complex Reactive Streams rules for you, while giving you full freedom and type-safety while implementing your stages (which could be a Flow).

For example, taken from the docs, is an GraphStage implementation of the filter(T => Boolean) operator:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

It also handles asynchronous channels and is fusable by default.

In addition to the docs, these blog posts explain in detail why this API is the holy grail of building custom stages of any shape:


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...