I am using Rx.js to process the contents of a file, make an http request for each line and then aggregate the results. However the source file contains thousands of lines and I am overloading the remote http api that I am performing the http request to. I need to make sure that I wait for the existing http request to callback before starting another one. I'd be open to batching and performing n
requests at a time but for this script performing the requests in serial is sufficient.
I have the following:
const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');
const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
process.nextTick(() => {
callback('http response');
});
});
rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
.flatMap(t => t.toString().split('
'))
.take(5)
.concatMap(t => {
console.log('Submitting request');
return doHttpRequest(t);
})
.subscribe(results => {
console.log(results);
}, err => {
console.error('Error', err);
}, () => {
console.log('Completed');
});
However this does not perform the http requests in serial. It outputs:
Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed
If I remove the call to concatAll()
then the requests are in serial but my subscribe function is seeing the observables before the http requests have returned.
How can I perform the HTTP requests serially so that the output is as below?
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…