A more functional approach would be possible if the Stream
had some kind of iterator
interface, that divides the list in the first element and its successors (like Haskell lists are built, you seem to know them).
I know this code is more complex (and at least longer) at first, but using the structures gets more convenient:
function Promise(resolver) {
// you know better promise libs of course
// this one is not even monadic
var subs = [],
res = null;
resolver(function resolve() {
res = arguments;
while (subs.length) subs.shift().apply(null, res);
});
this.onData = function(f) {
if (res)
f.apply(null, res);
else
subs.push(f);
return this;
};
}
Promise.all = function() {
var ps = Array.prototype.concat.apply([], arguments);
return new Promise(function(resolve) {
var res = [],
l = ps.length;
ps.forEach(function(p, i) {
p.onData(function() {
while(res.length < arguments.length) res.push([]);
for (var j=0; j<arguments.length; j++)
res[j][i] = arguments[j];
if (--l == 0)
resolve.apply(null, res);
});
});
});
};
function Stream() {
// an asynchronous (random) list
var that = this,
interval = (Math.random() * 100 + 500) | 0;
this.first = new Promise(function create(resolve) {
that.id = setTimeout(function() {
resolve(Math.random(), new Promise(create));
}, interval);
});
}
// this is how to consume a stream:
Stream.prototype.forEach = function(f) {
this.first.onData(function fire(res, next) {
f(res);
next.onData(fire);
});
return this;
};
Stream.prototype.end = function() { clearTimeout(this.id); return this; };
But zipping them is easy now:
function zip() {
var res = Object.create(Stream.prototype); // inherit the Stream interface
res.first = (function create(firsts) {
return new Promise(function(resolve) {
Promise.all(firsts).onData(function(results, nexts) {
resolve(results, create(nexts));
});
});
})(Array.prototype.map.call(arguments, function(stream) {
return stream.first;
}));
return res;
}
zip(new Stream, new Stream).forEach(console.log.bind(console));
Basically I've generalized your waiting for the first items into the Promise pattern, where Promise.all
features parallel waiting, and your mutable arrays of results into nested lists of promises. And I've avoided code duplication (for left
and right
) by making all functions work with any number of arguments.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…