Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.0k views
in Technique[技术] by (71.8m points)

java - How to sequentially chain Vertx CompositeFuture using RXJava?

I need to chain sequentially in order Vertx CompositeFutures in a RxJava style for dependent CompositeFuture, avoiding callback hell.

The use case:

Each CompositeFuture.any/all do some async operations that return futures, lets say myList1, myList2, myList3, but I must wait for CompositeFuture.any(myList1) to complete and return success before doing CompositeFuture.any(myList2), and the same from myList2 to myList3. Naturally, the CompositeFuture itself does the jobs async, but just for its set of operations, since the next set have to be done just after the first set goes well.

Doing it in a "callback-hell style" would be:

    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {


        CompositeFuture.any(myList1 < Future >)
                .onComplete(ar1 -> {
                    if (!ar1.succeeded()) {
                        asyncResultHandler.handle(ar1);
                    } else {
                        CompositeFuture.any(myList2 < Future >)
                                .onComplete(ar2 -> {
                                            if (!ar2.succeeded()) {
                                                asyncResultHandler.handle(ar2);
                                            } else {
                                                CompositeFuture.all(myList3 < Future >)
                                                        .onComplete(ar3 -> {
                                                            asyncResultHandler.handle(ar3);
                                                        
    .... <ARROW OF CLOSING BRACKETS> ...
}

Now I tried somenthing like this:

    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
        Single
                .just(CompositeFuture.any(myList1 < Future >))
                .flatMap(previousFuture -> rxComposeAny(previousFuture, myList2 < Future >))
                .flatMap(previousFuture -> rxComposeAll(previousFuture, myList3 < Future >))
                .subscribe(SingleHelper.toObserver(asyncResultHandler));
    }

    public static Single<CompositeFuture> rxComposeAny(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);
    }

    public static Single<CompositeFuture> rxComposeAll(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult);

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);
    }
}

Much more compact and clear. But, I am not succeeding in passing the previous fails to the asyncResultHandler.

My idea was as follows: The flatMap passes the previous CompositeFuture result and I want to check if it failed. The next rxComposeAny/All first checks to see if previous failed, if so, just returns the failed CompositeFuture and so on until it hits the handler in the subscriber. If the previous passed the test, I`m ok to continue passing the current result till the last successful CompositeFuture hits the handler.

The problem is that the check

        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

doesn't work, and all the CompositeFutures are processed, but not tested for successful completion, just the last one ends up being passed to the asyncResultHandler which will test for overall failure (but in the case of my code, it ends up cheking just the last one)

I`m using Vertx 3.9.0 and RxJava 2 Vertx API.

Disclosure: I have experience in Vertx, but I'm totally new in RxJava. So I appreciate any answer, from technical solutions to conceptual explanations.

Thank you.

EDIT (after excellent response of @homerman): I need to have the exact same behavior of the "callback hell style" of sequentially dependent CompositeFutures, ie, the next must be called after onComplete and test for completed with failure or success. The complexity comes from the fact that:

  1. I have to use vertx CompositeAll/Any methods, not zip. Zip provides behaviour similar to CompositeAll, but not CompositeAny.
  2. CompositeAll/Any return the completed future just inside onComplete method. If I check it before as showed above, since it is async, I will get unresolved futures.
  3. CompositeAll/Any if failed will not throw error, but failed future inside onComplete, so I cannot use onError from rxJava.

For example, I tried the following change in the rxComposite function:

    public static Single<CompositeFuture> rxLoadVerticlesAny(CompositeFuture previousResult, Vertx vertx, String deploymentName,
                                                             List<Class<? extends Verticle>> verticles, JsonObject config) {
        previousResult.onComplete(event -> {
                    if (event.failed()) {
                        return Single.just(previousResult);

                    } else {
                        CompositeFuture compositeFuture = CompositeFuture.any(VertxDeployHelper.deploy(vertx, verticles, config));
                        return Single.just(compositeFuture);
                    }
                }
        );
    }

But naturally it does not compile, since lambda is void. How can I reproduce this exact same behavior it rxJava in Vertx?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Just to clarify something...

Each CompositeFuture.any/all do some async operations that return futures, lets say myList1, myList2, myList3, but I must wait for CompositeFuture.any(myList1) to complete and return success before doing CompositeFuture.any(myList2), and the same from myList2 to myList3.

You've offered CompositeFuture.any() and CompositeFuture.all() as points of reference, but the behavior you describe is consistent with all(), which is to say the resulting composite will yield success only if all its constituents do.

For the purpose of my answer, I'm assuming all() is the behavior you expect.

In RxJava, an unexpected error triggered by an exception will result in termination of the stream with the underlying exception being delivered to the observer via the onError() callback.

As a small demo, assume the following setup:

final Single<String> a1 = Single.just("Batch-A-Operation-1");
final Single<String> a2 = Single.just("Batch-A-Operation-2");
final Single<String> a3 = Single.just("Batch-A-Operation-3");

final Single<String> b1 = Single.just("Batch-B-Operation-1");
final Single<String> b2 = Single.just("Batch-B-Operation-2");
final Single<String> b3 = Single.just("Batch-B-Operation-3");

final Single<String> c1 = Single.just("Batch-C-Operation-1");
final Single<String> c2 = Single.just("Batch-C-Operation-2");
final Single<String> c3 = Single.just("Batch-C-Operation-3");

Each Single represents a discrete operation to be performed, and they are logically named according to some logical grouping (ie they are meant to be executed together). For example, "Batch-A" corresponds to your "myList1", "Batch-B" to your "myList2", ...

Assume the following stream:

Single
    .zip(a1, a2, a3, (s, s2, s3) -> {
      return "A's completed successfully";
    })
    .flatMap((Function<String, SingleSource<String>>) s -> {
      throw new RuntimeException("B's failed");
    })
    .flatMap((Function<String, SingleSource<String>>) s -> {
      return Single.zip(c1, c2, c3, (one, two, three) -> "C's completed successfully");
    })
    .subscribe(
        s -> System.out.println("## onSuccess(" + s + ")"),
        t -> System.out.println("## onError(" + t.getMessage() + ")")
    );

(If you're not familiar, the zip() operator can be used to combine the results of all the sources supplied as input to emit another/new source).

In this stream, because the processing of the B's ends up throwing an exception:

  • the stream is terminated during the execution of the B's
  • the exception is reported to the observer (ie the onError() handler is triggered)
  • the C's are never processed

If what you want, however, is to decide for yourself whether or not to execute each branch, one approach you could take is to pass the results from previous operations down the stream using some sort of state holder, like so:

class State {
  final String value;
  final Throwable error;

  State(String value, Throwable error) {
    this.value = value;
    this.error = error;
  }
}

The stream could then be modified to conditionally execute different batches, for example:

Single
    .zip(a1, a2, a3, (s, s2, s3) -> {
      try {
        // Execute the A's here...
        return new State("A's completed successfully", null);

      } catch(Throwable t) {
        return new State(null, t);
      }
    })
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the B's
          return Single.just(new State("B's completed successfully", null));
          
        } catch(Throwable t) {
          return Single.just(new State(null, t));
        }
      }
    })
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the C's
          return Single.just(new State("C's completed successfully", null));

        } catch(Throwable t) {
          return Single.just(new State(null, t));
        }
      }
    })
    .subscribe(
        s -> {
          if(s.error != null) {
            System.out.println("## onSuccess with error: " + s.error.getMessage());
          } else {
            System.out.println("## onSuccess without error: " + s.value);
          }
        },
        t -> System.out.println("## onError(" + t.getMessage() + ")")
    );

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...