• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

TypeScript kafka-node.Consumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了TypeScript中kafka-node.Consumer的典型用法代码示例。如果您正苦于以下问题:TypeScript Consumer类的具体用法?TypeScript Consumer怎么用?TypeScript Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Consumer类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的TypeScript代码示例。

示例1: kit

  kit('publishes to a kafka topic', (done) => {
    let topic = 'gustavTest-publish';

    let consumer = new kafka.Consumer(client, [{
      topic
    }]);

    let obs = new Observable(o => {
      setTimeout(() => o.next('hello'), 15);
    });

    gr.to(topic, obs);

    consumer.on('message', (message) => {
      expect(message.value).to.equal('hello');
      done();
    });
  });
开发者ID:jonesnc,项目名称:gustav,代码行数:18,代码来源:GKafka.ts


示例2: from

  // two methods, one called on from, other on to
  from(topic: string, offset?: number): Observable<any> {
    let client = this.getClient();

    let consumer = new kafka.Consumer(client, [{
      topic
    }]/* TODO */);

    return new Observable(o => {
      consumer.on('message', m => {
        if (m.value === '__done') {
          return o.complete();
        }
        o.next(m.value)
      });
      consumer.on('error', err => o.error(err));

      return () => consumer.close(() => {});
    });
  }
开发者ID:jonesnc,项目名称:gustav,代码行数:20,代码来源:GustavKafka.ts


示例3: restore


//.........这里部分代码省略.........
        }
      }

      if (_.isEmpty(restoreEventSetup)) {
        this.logger.warn('No data was setup for the restore process.');
      } else {
        const that = this;
        // Start the restore process
        this.logger.warn('restoring data');

        for (let topicName in restoreEventSetup) {
          const topicSetup: any = restoreEventSetup[topicName];
          const restoreTopic: Topic = topicSetup.topic;
          const topicEvents: any = topicSetup.events;

          // saving listeners for potentially subscribed events on this topic,
          // so they don't get called during the restore process
          const previousEvents: string[] = _.cloneDeep(restoreTopic.subscribed);
          const listenersBackup = new Map<string, Function[]>();
          for (let event of previousEvents) {
            listenersBackup.set(event, (restoreTopic.emitter as EventEmitter).listeners(event));
            await restoreTopic.removeAllListeners(event);
          }

          // const eventNames = _.keys(restoreTopic.events);
          const baseOffset: number = topicSetup.baseOffset;
          const targetOffset: number = (await restoreTopic.$offset(-1)) - 1;
          const ignoreOffsets: number[] = topicSetup.ignoreOffset;
          const eventNames = _.keys(topicEvents);

          this.logger.debug(`topic ${topicName} has current offset ${targetOffset}`);

          const consumerClient = new kafka.KafkaClient({ kafkaHost: kafkaEventsCfg.kafkaHost });
          const consumer = new kafka.Consumer(
            consumerClient,
            [
              { topic: restoreTopic.name, offset: baseOffset }
            ],
            {
              autoCommit: true,
              encoding: 'buffer',
              fromOffset: true
            }
          );

          const drainEvent = (message, done) => {
            const msg = message.value;
            const eventName = message.key.toString();
            const context = _.pick(message, ['offset', 'partition', 'topic']);
            const eventListener = topicEvents[message.key];
            // decode protobuf
            let decodedMsg = that.kafkaEvents.provider.decodeObject(kafkaEventsCfg, eventName, msg);
            decodedMsg = _.pick(decodedMsg, _.keys(decodedMsg)); // preventing protobuf.js special fields
            eventListener(decodedMsg, context, that.config, eventName).then(() => {
              done();
            }).catch((err) => {
              that.logger.error(`Exception caught invoking restore listener for event ${eventName}:`, err);
              done(err);
            });

            if (message.offset >= targetOffset) {
              for (let event of eventNames) {
                restoreTopic.removeAllListeners(event).then(() => { }).catch((err) => {
                  that.logger.error('Error removing listeners after restore', err);
                });
              }
开发者ID:restorecommerce,项目名称:chassis-srv,代码行数:67,代码来源:index.ts


示例4: Observable

    return new Observable(o => {
      consumer.on('message', m => {
        if (m.value === '__done') {
          return o.complete();
        }
        o.next(m.value)
      });
      consumer.on('error', err => o.error(err));

      return () => consumer.close(() => {});
    });
开发者ID:jonesnc,项目名称:gustav,代码行数:11,代码来源:GustavKafka.ts


示例5: return

 return () => consumer.close(() => {});
开发者ID:jonesnc,项目名称:gustav,代码行数:1,代码来源:GustavKafka.ts



注:本文中的kafka-node.Consumer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
TypeScript kafka-node.Producer类代码示例发布时间:2022-05-25
下一篇:
TypeScript http.get函数代码示例发布时间:2022-05-25
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap