本文整理汇总了TypeScript中stream.Writable类的典型用法代码示例。如果您正苦于以下问题:TypeScript Writable类的具体用法?TypeScript Writable怎么用?TypeScript Writable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Writable类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的TypeScript代码示例。
示例1: accepts
export function httpRespond<E>({req, res, type, value}: HttpParams<E>, callback?: ErrCallback) {
assert.instanceOf(type, AbstractType)
if (callback === undefined) callback = _ => {}
assert.instanceOf(callback, Function)
assert.instanceOf(req, http.IncomingMessage)
assert.instanceOf(res, http.OutgoingMessage)
try {
res.setHeader('Content-Type', 'application/octet-stream')
res.setHeader('sig', type.getSignature())
const acceptsGzip = accepts(req).encoding(['gzip'])
let outStream: Writable
if (acceptsGzip) {
res.setHeader('Content-Encoding', 'gzip')
outStream = zlib.createGzip() //pipe into a zip stream to decrease size of response
}
else outStream = res
function writeEndCallback(err: Error | null) {
if (err) callback!(err)
else if (!acceptsGzip) callback!(null)
}
if (req.headers.sig && req.headers.sig === type.getSignature()) { //if client already has type, only value needs to be sent
writeValue({type, value, outStream}, writeEndCallback)
}
else writeTypeAndValue({type, value, outStream}, writeEndCallback) //otherwise, type and value need to be sent
if (acceptsGzip) { //don't pipe until writing begins
outStream.pipe(res)
.on('finish', () => callback!(null))
}
}
catch (err) { callback(err) }
}
开发者ID:calebsander,项目名称:structure-bytes,代码行数:31,代码来源:io.ts
示例2: Writable
fromClusters.forEach(fromCluster => {
let clusterBalancePromise = this.getClusterBalanceDefaultZero(fromCluster);
let clusterTransactionCount = 0;
let txMerger = new Writable({
objectMode: true,
write: async (data: {key: {height: number, n: number}, value: {txid: string, balanceChange: number}}, encoding, callback) => {
clusterTransactionCount++;
let tx: ClusterTransaction = new ClusterTransaction(
data.value.txid,
data.key.height,
data.key.n,
data.value.balanceChange
);
if (!txIdToOldTransationPromise.has(tx.txid)) {
txIdToOldTransationPromise.set(tx.txid, this.getClusterTransactionDefaultUndefined(toCluster, tx.height, tx.n));
}
let txToMerge = txidToTransactionToMerge.get(tx.txid)
if (txToMerge) {
txToMerge.balanceChange += tx.balanceChange;
} else {
txidToTransactionToMerge.set(tx.txid, tx);
}
await this.db.writeBatchService.push(
this.clusterTransactionTable.delOperation({clusterId: fromCluster, height: tx.height, n: tx.n})
);
callback(null);
}
});
let nextClusterId = new ClusterId(fromCluster.height, fromCluster.txN, fromCluster.outputN+1);
this.clusterTransactionTable.createReadStream({
gte: {clusterId: fromCluster},
lt: {clusterId: nextClusterId}
}).pipe(txMerger);
txMerger.on('finish', async () => {
await this.db.writeBatchService.push(
this.clusterBalanceTable.delOperation({clusterId: fromCluster})
);
await this.db.writeBatchService.push(
this.clusterTransactionCountTable.delOperation({clusterId: fromCluster})
);
let clusterBalance = await clusterBalancePromise;
fromClusterBalanceSum += clusterBalance;
if (clusterTransactionCount > 0) {
await this.db.writeBatchService.push(
this.balanceToClusterTable.delOperation({balance: clusterBalance, clusterId: fromCluster})
);
}
clustersToMerge--;
if (clustersToMerge === 0) resolve();
});
});
开发者ID:Antti-Kaikkonen,项目名称:CoinClustering,代码行数:55,代码来源:cluster-transaction-service.ts
示例3: doProcessing
async function doProcessing() {
await db.writeBatchService.process();
let height = await rpcApi.getRpcHeight();
console.log("rpc height", height);
let lastMergedHeight: number = await blockImportService.getLastMergedHeight();
let lastSavedTxHeight: number = await blockImportService.getLastSavedTxHeight();
console.log("last saved tx height", lastSavedTxHeight);
let blockWriter: Writable;
let startHeight: number;
let toHeight: number;
if (lastMergedHeight < height-stay_behind_blocks) {
startHeight = lastMergedHeight > -1 ? lastMergedHeight + 1 : 1;
toHeight = height-stay_behind_blocks;
console.log("merging between blocks", startHeight, "and", toHeight);
blockWriter = new Writable({
objectMode: true,
write: async (block: BlockWithTransactions, encoding, callback) => {
await blockImportService.blockMerging(block);
callback(null);
}
});
} else if (lastSavedTxHeight < height-stay_behind_blocks) {
startHeight = lastSavedTxHeight > -1 ? lastSavedTxHeight + 1 : 1;
toHeight = lastMergedHeight;
console.log("saving transactions between blocks", startHeight, "and", toHeight);
blockWriter = new Writable({
objectMode: true,
write: async (block: BlockWithTransactions, encoding, callback) => {
await blockImportService.saveBlockTransactionsAsync(block);
blockTimeService.setTime(block.height, block.time);
callback(null);
}
});
} else {
setTimeout(doProcessing, 10000);
return;
}
let blockReader = blockchainReader.createReadStream(startHeight, toHeight);
blockReader.pipe(blockWriter);
let interval = setInterval(()=>{
console.log("blockWriter", blockWriter.writableLength);
}, 5000);
blockWriter.on('finish', () => {
clearInterval(interval);
setTimeout(doProcessing, 0);
});
}
开发者ID:Antti-Kaikkonen,项目名称:CoinClustering,代码行数:50,代码来源:app.ts
示例4: write
export const wrapWriter = <Msg>(w: Writable, encode?: (msg: Msg) => any) => {
return {
write(data) {
if (w.writable) {
w.write(encode ? encode(data) : data)
// Work around this bug:
// https://github.com/kawanet/msgpack-lite/issues/80
if ((w as any).encoder) (w as any).encoder.flush()
}
},
close() {
w.end()
}
} as TinyWriter<Msg>
}
开发者ID:josephg,项目名称:statecraft,代码行数:16,代码来源:tinystream.ts
示例5: onMsg
export const wrapWebSocket = <R, W>(socket: WebSocket): [TinyReader<R>, TinyWriter<W>] => {
// TODO: Consider using the code in wsclient instead to convert a socket to a reader/writer pair.
const reader: TinyReader<R> = {buf: [], isClosed: false}
socket.on("message", data => {
// if (!isProd) console.log('C->S', data)
onMsg(reader, JSON.parse(data as any))
})
// I could just go ahead and make a TinyWriter, but this handles
// backpressure.
const writer = new Writable({
objectMode: true,
write(data, _, callback) {
// if (!isProd) console.log('S->C', data)
if (socket.readyState === socket.OPEN) {
// TODO: Should this pass the callback? Will that make backpressure
// work?
socket.send(JSON.stringify(data))
}
callback()
},
})
socket.on('close', () => {
writer.end() // Does this help??
reader.isClosed = true
reader.onClose && reader.onClose()
})
return [reader, wrapWriter(writer)]
}
开发者ID:josephg,项目名称:statecraft,代码行数:32,代码来源:wsserver.ts
示例6: nodify
/// * `stream = writer.nodify()`
/// converts the writer into a native node Writable stream.
nodify() {
const self = this;
const stream = new nodeStream.Writable();
// ES2015 does not let us override method directly but we do it!
// This is fishy. Clean up later (should do it from end event).
// also very fragile because of optional args.
const anyStream: any = stream;
anyStream._write = function (chunk: any, encoding?: string, done?: Function) {
if (chunk && encoding && encoding !== 'buffer') chunk = chunk.toString(encoding);
_.run(_ => self.write(_, chunk), err => {
if (err) return stream.emit('error', err) as never;
if (done) done();
});
}
// override end to emit undefined marker
const end = stream.end;
anyStream.end = function (chunk: any, encoding?: string, cb?: (err: any, val?: any) => any) {
end.call(stream, chunk, encoding, (err: any) => {
if (err) return stream.emit('error', err) as never;
cb = cb || ((err) => { });
_.run(_ => self.write(_, undefined), cb);
});
};
return stream;
}
开发者ID:Sage,项目名称:ez-streams,代码行数:27,代码来源:writer.ts
示例7: init
async init() {
console.log("Filling block time cache...");
await new Promise((resolve, reject) => {
this.blockTimeTable.createReadStream().on('data', (data) => {
let height: number = data.key.height;
let time: number = data.value.time;
this.heightToTime.set(height, time);
}).on('end', () => {
resolve();
});
});
let targetBlock: number = await this.blockImportService.getLastSavedTxHeight();
let currentHeight = 1;
let writable = new Writable({
objectMode: true,
write: (promise: Promise<any>, encoding, callback) => {
promise.then((res) => callback());
}
});
var missingHeightTimeReader = new Readable({
objectMode: true,
read: (size) => {
while (currentHeight <= targetBlock) {
if (!this.heightToTime.has(currentHeight)) {
let height = currentHeight;
let promise = new Promise<any>(async (resolve, reject) => {
let blockHash = await this.rpcApi.getRpcBlockHash(height);
let blocks = await this.restApi.blockHeaders(1, blockHash);
let time = blocks[0].time;
this.heightToTime.set(height, time);
await this.blockTimeTable.put({height: height}, {time: time});
resolve(time);
});
missingHeightTimeReader.push(promise);
currentHeight++;
return;
}
currentHeight++;
}
missingHeightTimeReader.push(null);
}
});
await new Promise((resolve, reject) => {
writable.on('finish', () => {
resolve();
});
missingHeightTimeReader.pipe(writable);
});
console.log("Filling block time cache done");
}
开发者ID:Antti-Kaikkonen,项目名称:CoinClustering,代码行数:56,代码来源:block-time-service.ts
示例8:
/**
* Appends a contiguous set of bytes
* to the end of the written data
* @param buffer The bytes to add
*/
addAll(buffer: ArrayBuffer) {
assert.instanceOf(buffer, ArrayBuffer)
if (this.pauseCount) this.paused.addAll(buffer)
else this.outStream.write(Buffer.from(buffer))
this.writtenBytes += buffer.byteLength
return this
}
开发者ID:calebsander,项目名称:structure-bytes,代码行数:12,代码来源:appendable-stream.ts
示例9: w
const w = (taskOffset: number) => {
if (taskOffset >= tasks.length) {
if (differentialDownloader.fileMetadataBuffer != null) {
out.write(differentialDownloader.fileMetadataBuffer)
}
out.end()
return
}
const nextOffset = taskOffset + (differentialDownloader.options.useMultipleRangeRequest === false ? 1 : 1000)
_executeTasks(differentialDownloader, {
tasks,
start: taskOffset,
end: Math.min(tasks.length, nextOffset),
oldFileFd,
}, out, () => w(nextOffset), reject)
}
开发者ID:ledinhphuong,项目名称:electron-builder,代码行数:17,代码来源:multipleRangeDownloader.ts
示例10: doExecuteTasks
const w = (taskOffset: number) => {
if (taskOffset >= tasks.length) {
if (differentialDownloader.fileMetadataBuffer != null) {
out.write(differentialDownloader.fileMetadataBuffer)
}
out.end()
return
}
const nextOffset = taskOffset + 1000
doExecuteTasks(differentialDownloader, {
tasks,
start: taskOffset,
end: Math.min(tasks.length, nextOffset),
oldFileFd,
}, out, () => w(nextOffset), reject)
}
开发者ID:electron-userland,项目名称:electron-builder,代码行数:17,代码来源:multipleRangeDownloader.ts
注:本文中的stream.Writable类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论