• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ConsumerRecords类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.consumer.ConsumerRecords的典型用法代码示例。如果您正苦于以下问题:Scala ConsumerRecords类的具体用法?Scala ConsumerRecords怎么用?Scala ConsumerRecords使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ConsumerRecords类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ConsumerLoop

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.consumer

import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.hpi.esb.datavalidator.config.KafkaConsumerConfig
import org.hpi.esb.datavalidator.util.Logging

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

class ConsumerLoop(topic: String, config: KafkaConsumerConfig, results: ListBuffer[ConsumerRecord[String, String]]) extends Runnable with Logging {

  private val props = createConsumerProps()
  private val consumer = new KafkaConsumer(props)

  initializeConsumer()

  override def run(): Unit = {

    var running = true
    var zeroCount = 0

    while (running) {
      val records = consumer.poll(1000).asInstanceOf[ConsumerRecords[String, String]]

      if (records.count() == 0) {
        logger.debug(s"Received 0 records from Kafka.")
        zeroCount += 1
        if (zeroCount == 3) {
          logger.debug("Received 0 records from Kafka for the third time. We assume the stream has finished and terminate.")
          running = false
        }
      }

      for (record <- records) {
        results.append(record)
      }
    }
    consumer.close()
  }

  private def initializeConsumer(): Unit = {
    val topicPartitions = List(new TopicPartition(topic, 0))
    consumer.assign(topicPartitions)
    consumer.seekToBeginning(topicPartitions)
  }

  private def createConsumerProps(): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, s"Validator")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.autoCommit)
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.autoCommitInterval)
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.sessionTimeout)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.keyDeserializerClass)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.valueDeserializerClass)
    props
  }
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:62,代码来源:ConsumerLoop.scala


示例2: TweetConsumer

//设置package包名称以及导入依赖的类
package com.knoldus.kafka

import java.util
import java.util.Properties

import com.knoldus.twitter.Tweet
import com.knoldus.utils.ConfigReader
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._

class TweetConsumer {

  def consumeTweets(groupId: String): Unit = {
    val kafkaServer = ConfigReader.getKafkaServers
    val kafkaTopic = ConfigReader.getKafkaTopic

    val properties = new Properties()
    properties.put("bootstrap.servers", kafkaServer)
    properties.put("group.id", groupId)
    properties.put("enable.auto.commit", "true")
    properties.put("auto.commit.interval.ms", "1000")
    properties.put("auto.offset.reset", "earliest")
    properties.put("session.timeout.ms", "30000")
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    val kafkaConsumer = new KafkaConsumer[String, Tweet](properties)
    kafkaConsumer.subscribe(util.Collections.singletonList(kafkaTopic))

    while (true) {
      val records: ConsumerRecords[String, Tweet] = kafkaConsumer.poll(100)
      records.records(kafkaTopic).iterator().toList.foreach { record =>
        println(s"Received : ${record.value()}")
      }
    }
  }
} 
开发者ID:SangeetaGulia,项目名称:activator-kafka-producer-consumer,代码行数:38,代码来源:TweetConsumer.scala


示例3: ReadyKafkaConsumer

//设置package包名称以及导入依赖的类
package com.bencassedy.readykafka.consumer

import java.util
import java.util.Properties
import java.util.concurrent.Executors
import org.apache.kafka.common.serialization.StringDeserializer

import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}


class ReadyKafkaConsumer[A](
                             topics: List[String],
                             groupId: String,
                             brokerList: String = "localhost:9092",
                             msgFunc: (ConsumerRecord[String, String]) => A
                           ) {
  val consumerProperties = new Properties()
  consumerProperties.put("bootstrap.servers", brokerList)
  consumerProperties.put("group.id", groupId)
  consumerProperties.put("auto.offset.reset", "earliest")
  consumerProperties.put("key.deserializer", classOf[StringDeserializer])
  consumerProperties.put("value.deserializer", classOf[StringDeserializer])

  def consume(): Unit = {
    val numConsumers = 2
    0 until numConsumers foreach { n =>
      val f = Future {
        val consumer = new KafkaConsumer[String, String](consumerProperties)
        consumer.subscribe(topics.asJava)
        while (true) {
          val crs: ConsumerRecords[String, String] = consumer.poll(1000)
          crs.iterator().asScala.foreach(r => msgFunc(r))
        }
      }

      f onComplete {
        case Success(_) => println("success! consumer future shutting down")
        case Failure(e) => println(s"failure! bah! exception: $e")
      }
    }
  }
} 
开发者ID:bencassedy,项目名称:ready-kafka,代码行数:49,代码来源:ReadyKafkaConsumer.scala


示例4: KafkaEventSourceTest

//设置package包名称以及导入依赖的类
package process

import java.util
import java.util.Collections

import kpi.twitter.analysis.utils.{PredictedStatus, SentimentLabel, TweetSerDe}
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import twitter4j.Status


class KafkaEventSourceTest extends FunSuite with MockitoSugar {


  test("subscribe should be invoked once for correct topic") {
    val topicName = "fake"
    val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
    val mockTime = new MockTime

    val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
    verify(mockConsumer, times(1)).subscribe(Collections.singletonList(topicName))
  }

  
  test("poll should return on max records") {

    val topicName = "fake"
    val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
    val mockTime = new MockTime

    when(mockConsumer.poll(1000)).thenAnswer(new Answer[ConsumerRecords[SentimentLabel, Status]]() {
      override def answer(invocation: InvocationOnMock): ConsumerRecords[SentimentLabel, Status] = {
        mockTime.sleep(1)
        val tp = new TopicPartition(topicName, 1)
        val record = new ConsumerRecord[SentimentLabel, Status](topicName, 0, 0, mock[SentimentLabel], mock[Status])
        val recordsMap = new util.HashMap[TopicPartition, util.List[ConsumerRecord[SentimentLabel, Status]]]()
        val recordsList = new util.ArrayList[ConsumerRecord[SentimentLabel, Status]]()
        recordsList.add(record)
        recordsMap.put(tp, recordsList)
        new ConsumerRecords[SentimentLabel, Status](recordsMap)

      }
    })

    val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)

    val records = kafkaEventSource.poll(1000, 1)

    assert(1 === records.size)
    assert(1 === mockTime.currentMillis)
  }
} 
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:58,代码来源:KafkaEventSourceTest.scala


示例5: SimpleKafkaConsumer

//设置package包名称以及导入依赖的类
import java.util.Properties

import com.fasterxml.jackson.databind.KeyDeserializer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.ConsumerExtensions._

class SimpleKafkaConsumer[K,V](consumerProps : Properties,
                               topic : String,
                               keyDeserializer: Deserializer[K],
                               valueDeserializer: Deserializer[V],
                               function : ConsumerRecords[K, V] => Unit,
                               poll : Long = 2000) {

  private var running = false

  private val consumer = new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)


  private val thread = new Thread {
    import scala.collection.JavaConverters._

    override def run: Unit = {
      consumer.subscribe(List(topic).asJava)
      consumer.partitionsFor(topic)

      while (running) {
        val record: ConsumerRecords[K, V] = consumer.poll(poll)
        function(record)
      }
    }
  }

  def start(): Unit = {
    if(!running) {
      running = true
      thread.start()
    }
  }

  def stop(): Unit = {
    if(running) {
      running = false
      thread.join()
      consumer.close()
    }
  }
} 
开发者ID:zalando-incubator,项目名称:remora,代码行数:50,代码来源:SimpleKafkaConsumer.scala


示例6: Consumer

//设置package包名称以及导入依赖的类
package com.knoldus.consumer

import java.util.{Properties, UUID}

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._


class Consumer(groupId: String, servers: String, topics: List[String]) {

  private val timeout = 10000

  val logger = LoggerFactory.getLogger(this.getClass())

  private val props: Properties = new Properties
  props.put("bootstrap.servers", servers)
  props.put("client.id", UUID.randomUUID.toString)
  props.put("group.id", groupId)
  props.put("key.deserializer", classOf[StringDeserializer].getName)
  props.put("value.deserializer", classOf[StringDeserializer].getName)

  private val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(topics)

  def read(): List[MessageFromKafka] = {
    try {
      logger.info("Reading from kafka queue ...... " + topics)
      val consumerRecords: ConsumerRecords[String, String] = consumer.poll(timeout)
      consumerRecords.map(record => MessageFromKafka(record.value())).toList
    }
    catch {
      case wakeupException: WakeupException => {
        logger.error(" Getting WakeupException ", wakeupException)
        Nil
      }
    }
  }

  def close(): Unit = consumer.close()

}

case class MessageFromKafka(record: String) 
开发者ID:knoldus,项目名称:tweet-processing-engine,代码行数:48,代码来源:Consumer.scala



注:本文中的org.apache.kafka.clients.consumer.ConsumerRecords类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala BinaryMessage类代码示例发布时间:2022-05-23
下一篇:
Scala Image类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap