Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
375 views
in Technique[技术] by (71.8m points)

javascript - Implementing a buffered transform stream

I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.

For example:

  1. BufferStream implements stream.Transform, and keeps a 512KB internal ring buffer
  2. ReadableStreamA is piped to an instance of BufferStream
  3. BufferStream writes to its ring buffer, reading data from ReadableStreamA as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)
  4. BufferStream is piped to WritableStreamB
  5. WritableStreamB receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA through BufferStream.
  6. BufferStream is piped to WritableStreamC
  7. WritableStreamC also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB received, because more data has since been written to BufferStream.

Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.

For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data events. When a new function was attached with .on('data'), I would call it directly with a copy of the ring buffer.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Here's my take on your issue.

The basic idea is to create a Transform stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Then, we need to override the .pipe() method so that we are are notified when the BufferStream is piped into a stream, which allows us to automatically write data to it:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

In this way, when we write buffer.pipe(someStream), we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform class takes care of everything, while keeping track of the backpressure and whatnot.

Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...