Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
610 views
in Technique[技术] by (71.8m points)

rx java - Map lazily with RxJava's backpressure

I have a thread pushing events to a PublishProcessor and then a piece of code that consumes the messages with backpressure:

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    // a separate thread calls publishProcessor.onNext every 1s

    publishProcessor
        .onBackpressureBuffer(
            1,
            () -> System.out.println("Buffer full!"),
            BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.newThread(), false, 1)
        .map(this::parseMessage)
        .subscribe(this::consume); // consume has a fixed delay of 5s

This works fine, however what I'd like is to be able to expose a function that would return a Flowable of already parsed messages. Like this:

final Flowable<String> parsedMessages = publishProcessor.map(this::parseMessage);

// and then somewhere else:

parsedMessages
    .onBackpressureBuffer(
        1,
        () -> System.out.println("Buffer full!"),
        BackpressureOverflowStrategy.DROP_OLDEST)
    .observeOn(Schedulers.newThread(), false, 1)
    .subscribe(this::consume);

the thing is, I don't want the messages to get parsed if there is no space left in the buffer. This is what happens in the first case when buffer is full:

> Publish msg
> Buffer full!

and this is the 2nd case:

> Publish msg
> Parse msg
> Buffer full!

is there a way to make backpressure work all the way through the flow, even on stages that are added before backpressure buffer is set up?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...