Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
623 views
in Technique[技术] by (71.8m points)

rx java - How to address Flowable backpressure by delaying read until buffer has space

I get Flowable<MyItem> from a network request, and need to do some transformations on the MyItem objects, finally pass them on to another Flowable.

Currently I have something like this:

Flowable<MyItem> myStreamTransformed = myClient.getMyItemStream().map(myItem -> {
                return new MyItem(data: myItem.data+"_transformed")
            }).onBackpressureBuffer(100)
return myStreamTransformed

This works, but if the other end reading the transformed Flowable is slow to consume, the buffer often fills up, and error stops the stream. Rather than increase the buffer to some crazy size, I would prefer to delay reading more of the incoming Flowable until there is space in the buffer. I looked at writing my own Flowable, and implement onNext() processing with this logic, but it seemed overly complex for me.

This code is running as part of a Micronaut application, where the incoming Flowable is from a Micronaut generated client for a JSON stream, and the output Flowable is also turned into another stream (by Micronaut) for the caller of the above method.

So it sits between a client and a backend server, transforming some data, but often the backend is a bit faster to produce data than the client is to read, and I would not want to lose data but also not have to build huge buffers as that is kind of the point of streaming..

I believe the underlying operating systems and TCP stacks should be able to manage it. Can't quite figure out the RxJava approach to it though.

Q: How to handle this in RxJava2?

question from:https://stackoverflow.com/questions/65922261/how-to-address-flowable-backpressure-by-delaying-read-until-buffer-has-space

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...