本文整理汇总了TypeScript中kafka-node.Consumer类的典型用法代码示例。如果您正苦于以下问题:TypeScript Consumer类的具体用法?TypeScript Consumer怎么用?TypeScript Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Consumer类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的TypeScript代码示例。
示例1: kit
kit('publishes to a kafka topic', (done) => {
let topic = 'gustavTest-publish';
let consumer = new kafka.Consumer(client, [{
topic
}]);
let obs = new Observable(o => {
setTimeout(() => o.next('hello'), 15);
});
gr.to(topic, obs);
consumer.on('message', (message) => {
expect(message.value).to.equal('hello');
done();
});
});
开发者ID:jonesnc,项目名称:gustav,代码行数:18,代码来源:GKafka.ts
示例2: from
// two methods, one called on from, other on to
from(topic: string, offset?: number): Observable<any> {
let client = this.getClient();
let consumer = new kafka.Consumer(client, [{
topic
}]/* TODO */);
return new Observable(o => {
consumer.on('message', m => {
if (m.value === '__done') {
return o.complete();
}
o.next(m.value)
});
consumer.on('error', err => o.error(err));
return () => consumer.close(() => {});
});
}
开发者ID:jonesnc,项目名称:gustav,代码行数:20,代码来源:GustavKafka.ts
示例3: restore
//.........这里部分代码省略.........
}
}
if (_.isEmpty(restoreEventSetup)) {
this.logger.warn('No data was setup for the restore process.');
} else {
const that = this;
// Start the restore process
this.logger.warn('restoring data');
for (let topicName in restoreEventSetup) {
const topicSetup: any = restoreEventSetup[topicName];
const restoreTopic: Topic = topicSetup.topic;
const topicEvents: any = topicSetup.events;
// saving listeners for potentially subscribed events on this topic,
// so they don't get called during the restore process
const previousEvents: string[] = _.cloneDeep(restoreTopic.subscribed);
const listenersBackup = new Map<string, Function[]>();
for (let event of previousEvents) {
listenersBackup.set(event, (restoreTopic.emitter as EventEmitter).listeners(event));
await restoreTopic.removeAllListeners(event);
}
// const eventNames = _.keys(restoreTopic.events);
const baseOffset: number = topicSetup.baseOffset;
const targetOffset: number = (await restoreTopic.$offset(-1)) - 1;
const ignoreOffsets: number[] = topicSetup.ignoreOffset;
const eventNames = _.keys(topicEvents);
this.logger.debug(`topic ${topicName} has current offset ${targetOffset}`);
const consumerClient = new kafka.KafkaClient({ kafkaHost: kafkaEventsCfg.kafkaHost });
const consumer = new kafka.Consumer(
consumerClient,
[
{ topic: restoreTopic.name, offset: baseOffset }
],
{
autoCommit: true,
encoding: 'buffer',
fromOffset: true
}
);
const drainEvent = (message, done) => {
const msg = message.value;
const eventName = message.key.toString();
const context = _.pick(message, ['offset', 'partition', 'topic']);
const eventListener = topicEvents[message.key];
// decode protobuf
let decodedMsg = that.kafkaEvents.provider.decodeObject(kafkaEventsCfg, eventName, msg);
decodedMsg = _.pick(decodedMsg, _.keys(decodedMsg)); // preventing protobuf.js special fields
eventListener(decodedMsg, context, that.config, eventName).then(() => {
done();
}).catch((err) => {
that.logger.error(`Exception caught invoking restore listener for event ${eventName}:`, err);
done(err);
});
if (message.offset >= targetOffset) {
for (let event of eventNames) {
restoreTopic.removeAllListeners(event).then(() => { }).catch((err) => {
that.logger.error('Error removing listeners after restore', err);
});
}
开发者ID:restorecommerce,项目名称:chassis-srv,代码行数:67,代码来源:index.ts
示例4: Observable
return new Observable(o => {
consumer.on('message', m => {
if (m.value === '__done') {
return o.complete();
}
o.next(m.value)
});
consumer.on('error', err => o.error(err));
return () => consumer.close(() => {});
});
开发者ID:jonesnc,项目名称:gustav,代码行数:11,代码来源:GustavKafka.ts
示例5: return
return () => consumer.close(() => {});
开发者ID:jonesnc,项目名称:gustav,代码行数:1,代码来源:GustavKafka.ts
注:本文中的kafka-node.Consumer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论