Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
309 views
in Technique[技术] by (71.8m points)

javascript - Node.js: splitting stream content for n-parts

I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details

I have a stream which just generates some data(just a sequence of numbers):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();

a function which tries to take n next chunks:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}

and want to use it like this:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })

What is the idiomatic way to do that?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Using ReadableStream you could use a single function to check if elements of current chunk of data is equal to expected result.

Create variables, CHUNK and N, where CHUNK is the number of elements to slice or splice from original array, N is the variable incremented by CHUNK at each .enqueue() call within pull() call.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...