Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
823 views
in Technique[技术] by (71.8m points)

reactive programming - Simplify Spring Reactor stream - checking if at least one condition is true and then proceed

Have you any ideas that can help me simplify this stream?

The main challenge here is to check in a reactive way if consumers exist on a queue and then if there are no consumers check if there is at least one message on any of the queues.

Mono<String> resumeFullSync(FullSyncContext fullSyncContext,
                            Function<FullSyncContext, Mono<Void>> finalizeFullSyncCallback) {
    var fullSyncSpec = new FullSyncSpecification(fullSyncProperties, fullSyncContext);
    return sender.declare(fullSyncSpec.getQueueSpecification())
            .flatMap(declareOk -> {
                        if (declareOk.getConsumerCount() == 0) {
                            log.debug("Queue has no consumers ({})", fullSyncContext.getCategoryName());
                            return Flux.concat(Mono.just(declareOk),
                                    sender.declare(fullSyncSpec.getRetry1QueueSpecification()),
                                    sender.declare(fullSyncSpec.getRetry2QueueSpecification()),
                                    sender.declare(fullSyncSpec.getRetry3QueueSpecification()))
                                    .any(d -> d.getMessageCount() > 0)
                                    .flatMap(messagesExist -> {
                                        if (messagesExist) {
                                            log.debug("Queues have some messages ({})", fullSyncContext.getCategoryName());
                                            return sender.declare(fullSyncSpec.getExchangeSpecification())
                                                    .then(sender.bind(fullSyncSpec.getBindingSpecification()))
                                                    .then(sender.bind(fullSyncSpec.getRetry1BindingSpecification()))
                                                    .then(sender.bind(fullSyncSpec.getRetry2BindingSpecification()))
                                                    .then(sender.bind(fullSyncSpec.getRetry3BindingSpecification()))
                                                    .doOnNext(b -> setupFullSyncConsumer(fullSyncSpec, finalizeFullSyncCallback));
                                        } else {
                                            log.debug("Queues have no messages ({}). Stopping", fullSyncContext.getCategoryName());
                                            return Mono.empty();
                                        }
                                    });
                        } else {
                            log.debug("Queue has consumers ({}). Stopping", fullSyncContext.getCategoryName());
                            return Mono.empty();
                        }
                    }
            )
            .thenReturn(fullSyncContext.getCategoryName());
}
question from:https://stackoverflow.com/questions/65892097/simplify-spring-reactor-stream-checking-if-at-least-one-condition-is-true-and

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I simplified it myself and now it looks better.

Mono<String> resumeFullSync(FullSyncContext fullSyncContext,
                            Function<FullSyncContext, Mono<Void>> finalizeFullSyncCallback) {
    var fullSyncSpec = new FullSyncSpecification(fullSyncProperties, fullSyncContext);
    return sender.declare(fullSyncSpec.getQueueSpecification())
            .filter(declareOk -> declareOk.getConsumerCount() == 0)
            .flatMapMany(declareOk -> Flux.concat(Mono.just(declareOk),
                    sender.declare(fullSyncSpec.getRetry1QueueSpecification()),
                    sender.declare(fullSyncSpec.getRetry2QueueSpecification()),
                    sender.declare(fullSyncSpec.getRetry3QueueSpecification())))
            .any(declareOk -> declareOk.getMessageCount() > 0)
            .filter(messagesExist -> messagesExist)
            .flatMap(ignore -> {
                log.debug("Queues have some messages ({})", fullSyncContext.getCategoryName());
                return sender.declare(fullSyncSpec.getExchangeSpecification())
                        .then(sender.bind(fullSyncSpec.getBindingSpecification()))
                        .then(sender.bind(fullSyncSpec.getRetry1BindingSpecification()))
                        .then(sender.bind(fullSyncSpec.getRetry2BindingSpecification()))
                        .then(sender.bind(fullSyncSpec.getRetry3BindingSpecification()))
                        .doOnNext(b -> setupFullSyncConsumer(fullSyncSpec, finalizeFullSyncCallback));
            })
            .thenReturn(fullSyncContext.getCategoryName());
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...