Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
205 views
in Technique[技术] by (71.8m points)

javascript - Rx.js wait for callback to complete

I am using Rx.js to process the contents of a file, make an http request for each line and then aggregate the results. However the source file contains thousands of lines and I am overloading the remote http api that I am performing the http request to. I need to make sure that I wait for the existing http request to callback before starting another one. I'd be open to batching and performing n requests at a time but for this script performing the requests in serial is sufficient.

I have the following:

const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');

const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
  process.nextTick(() => {
    callback('http response');
  });
});

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
  .flatMap(t => t.toString().split('
'))
  .take(5)
  .concatMap(t => {
    console.log('Submitting request');

    return doHttpRequest(t);
  })
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

However this does not perform the http requests in serial. It outputs:

Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed

If I remove the call to concatAll() then the requests are in serial but my subscribe function is seeing the observables before the http requests have returned.

How can I perform the HTTP requests serially so that the output is as below?

Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The problem here is probably that when you use rx.Observable.fromCallback, the function you passed in argument is executed immediately. The observable returned will hold the value passed to the callback at a later point in time. To have a better view of what is happening, you should use a slightly more complex simulation : number your requests, have them return an actual (different for each request) result that you can observe through the subscription.

What I posit happens here :

  • take(5) issues 5 values
  • map issues 5 log messages, executes 5 functions and passes on 5 observables
  • those 5 observables are handled by concatAll and the values issued by those observables will be in order as expected. What you are ordering here is the result of the call to the functions, not the calls to the functions themselves.

To achieve your aim, you need to call your observable factory (rx.Observable.fromCallback) only when concatAll subscribes to it and not at creation time. For that you can use defer : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

So your code would turn into :

rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
  .map(t => t.toString().split('
'))
  .flatMap(t => t)
  .take(5)
  .map(t => {
    console.log('Submitting request');

    return Observable.defer(function(){return doHttpRequest(t);})
  })
  .concatAll()
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

You can see a similar issue with an excellent explanation here : How to start second observable *only* after first is *completely* done in rxjs

Your log is likely to still show 5 consecutive 'Submitting request' messages. But your request should be executed one after the other has completed as you wish.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...