Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
729 views
in Technique[技术] by (71.8m points)

c# - How can I alternately buffer and flow a live data stream in Rx

I have two streams. One is a flow of data (could be any type), the other is a boolean stream acting as a gate. I need to combine these into a stream that has the following behaviour:

  • When the gate is open (most recent value was true) then the data should flow straight through
  • When the gate is closed (most recent value was false) then the data should be buffered to be released as individual elements when the gate is next open
  • The solution should preserve all elements of the data and preserve order

I am not really sure how to put this together. The inputs I have been testing with are like this:

// a demo data stream that emits every second
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));

// a demo flag stream that toggles every 5 seconds
var toggle = false;
var gateStream = Observable.Interval(TimeSpan.FromSeconds(5))
    .Select(_ => toggle = !toggle);
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I would do this as follows:

  • Window the data stream using the gate stream as a closing selector
  • We can use DistinctUntilChanged on the gate stream to ensure no repeated values
  • We will also force the gate stream to start closed (false) - it won't affect the output and allows a neat trick
  • Then use the overload of Select that gives each element an index number. With this we can tell if we need to buffer or just emit the window as is, because we know that even-numbered windows are for buffering (because we made sure the gate stream starts with false)
  • We can use ToList() to buffer each even window until it closes - this is the actually equivalent of a Buffer() that waits until OnCompleted
  • We use an identity SelectMany to flatten the buffered windows
  • Finally we concatenate the windows in order to guarantee order is preserved

It looks like this:

dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged())
          .Select((w, i) =>  i % 2 == 0 ? w.ToList().SelectMany(x => x) : w)
          .Concat()
          .Subscribe(Console.WriteLine);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...