Here's my take on your issue.
The basic idea is to create a Transform
stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Then, we need to override the .pipe()
method so that we are are notified when the BufferStream
is piped into a stream, which allows us to automatically write data to it:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
In this way, when we write buffer.pipe(someStream)
, we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform
class takes care of everything, while keeping track of the backpressure and whatnot.
Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…