本文整理汇总了Scala中org.apache.kafka.clients.consumer.KafkaConsumer类的典型用法代码示例。如果您正苦于以下问题:Scala KafkaConsumer类的具体用法?Scala KafkaConsumer怎么用?Scala KafkaConsumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KafkaConsumer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SimpleConsumer
//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.core.operations
import java.util.Properties
import com.landoop.kafka.ws.KafkaConstants
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
object SimpleConsumer {
// Get a simple string consumer
def getConsumer[T](kafkaBrokers: String, maxRecords: Option[Int] = None): KafkaConsumer[T, T] = {
val props = new Properties()
props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
maxRecords.find(_ > 0).map { max =>
props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
}
val vProps = new VerifiableProperties(props)
val consumer = new KafkaConsumer[T, T](props)
consumer
}
def createNewConsumerWithConsumerGroup(kafkaBrokers: String, group: String): KafkaConsumer[String, String] = {
assert(group.length > 1, "Invalid group length")
val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
new KafkaConsumer(properties)
}
def getStringAvroConsumer[T](kafkaBrokers: String, schemaRegistry: String, maxRecords: Option[Int] = None): KafkaConsumer[String, T] = {
val props = new Properties()
props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(KafkaConstants.VALUE_DESERIALIZER, classOf[KafkaAvroDeserializer].getCanonicalName)
props.put(KafkaConstants.SCHEMA_REGISTRY_URL, schemaRegistry)
maxRecords.find(_ > 0).map { max =>
props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
}
val vProps = new VerifiableProperties(props)
val consumer = new KafkaConsumer[String, T](props)
consumer
}
}
开发者ID:Landoop,项目名称:kafka-ws,代码行数:58,代码来源:SimpleConsumer.scala
示例2: ProductUpdateCountService
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.productservice
import java.util
import domain.Order
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try
class ProductUpdateCountService(productsService: ProductsService,
orderConfirmationService: OrderConfirmationService,
kafkaConsumer: KafkaConsumer[String, String],
kafkaTopic: String) {
import com.owlike.genson.defaultGenson._
private var running = true
def start() = {
kafkaConsumer.subscribe(util.Arrays.asList(kafkaTopic))
Future {
while (running) {
val records = kafkaConsumer.poll(100)
for (record <- records.iterator()) {
for {
order <- Try(fromJson[Order](record.value()))
_ <- processOrder(order)
} yield Unit
}
}
}
}
def processOrder(order: Order): Try[Unit] = Try {
println(s"Processing order: $order")
productsService.removeProducts(order.products) match {
case true => orderConfirmationService.confirm(order.id)
case false => orderConfirmationService.fail(order.id)
}
println(s"Processed order: $order")
}
def stop() = {
running = false
kafkaConsumer.close()
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:56,代码来源:ProductUpdateCountService.scala
示例3: OrderProcessingService
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import java.util
import domain.Order
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
import scala.util.Try
class OrderProcessingService(orderConsumer: KafkaConsumer[String, String],
orderConsumerTopic: String,
storeUpdateProducer: KafkaProducer[String, String],
storeUpdateTopic: String) {
import com.owlike.genson.defaultGenson._
var running = true
def start() = {
orderConsumer.subscribe(util.Arrays.asList(orderConsumerTopic))
while (running) {
val records = orderConsumer.poll(100)
records.iterator().foreach(processOrder)
}
}
def processOrder(record: ConsumerRecord[String, String]): Unit = {
println(s"Processing ${record.value()}")
for {
order <- Try(fromJson[Order](record.value()))
_ <- Try {
println(s"Sending to store service: $order")
storeUpdateProducer.send(new ProducerRecord[String, String](storeUpdateTopic, toJson(order)))
}
} yield Unit
println(s"Processing ${record.value()}")
}
def stop() = {
orderConsumer.close()
running = false
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:51,代码来源:OrderProcessingService.scala
示例4: ConfirmationService
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import java.util
import domain.{OrderStatus, UpdateStoreStatus}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try
class ConfirmationService(confirmationConsumer: KafkaConsumer[String, String],
confirmationTopic: String,
replyProducer: KafkaProducer[String, String],
replyTopic: String) {
import com.owlike.genson.defaultGenson._
var running = true
def start() = {
confirmationConsumer.subscribe(util.Arrays.asList(confirmationTopic))
Future {
while (running) {
val records = confirmationConsumer.poll(100)
records.iterator().foreach(processConfirmation)
}
}.recover {
case ex => ex.printStackTrace()
}
}
def processConfirmation(record: ConsumerRecord[String, String]): Unit = {
println(s"Processing ${record.value()}")
for {
status <- Try(fromJson[UpdateStoreStatus](record.value()))
_ <- Try {
println(s"Replying $status")
replyProducer.send(new ProducerRecord(replyTopic, toJson(OrderStatus(status.orderId, status.success))))
}
} yield Unit
println(s"Processed ${record.value()}")
}
def stop() = {
confirmationConsumer.close()
running = false
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:55,代码来源:ConfirmationService.scala
示例5: OrderApp
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import scala.io.StdIn
object OrderApp extends App {
val confirmationService = new ConfirmationService(
confirmationConsumer = new KafkaConsumer[String, String](kafka.storeConfirmationConsumer),
confirmationTopic = "order.confirmation",
replyProducer = new KafkaProducer[String, String](kafka.producerCfg),
replyTopic = "api.reply"
)
val orderService = new OrderProcessingService(
new KafkaConsumer[String, String](kafka.orderConsumerCfg),
"order.order",
new KafkaProducer[String, String](kafka.producerCfg),
"store.update"
)
confirmationService.start()
orderService.start()
StdIn.readLine()
confirmationService.stop()
orderService.stop()
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:32,代码来源:OrderApp.scala
示例6: OrderStatusService
//设置package包名称以及导入依赖的类
package com.github.eventdrivenorders.api
import java.util
import domain.OrderStatus
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class OrderStatusService(statusConsumer: KafkaConsumer[String, String],
statusTopic: String) {
import com.owlike.genson.defaultGenson._
private var confirmations: Map[Long, String] = Map()
private var running = true
def getStatus(orderId: Long): Option[String] = confirmations.get(orderId)
def addPending(orderId: Long): Unit = {
confirmations += (orderId -> "pending")
}
def start() = {
statusConsumer.subscribe(util.Arrays.asList(statusTopic))
Future {
while (running) {
val records = statusConsumer.poll(100)
for (record <- records.iterator()) {
println("New status: " + record.value())
val status = fromJson[OrderStatus](record.value())
val textStatus = status.success match {
case true => "success"
case false => "failed"
}
confirmations += (status.orderId -> textStatus)
}
}
}
}
def stop() = {
running = false
statusConsumer.close()
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:54,代码来源:OrderStatusService.scala
示例7: KafkaUtilities
//设置package包名称以及导入依赖的类
package com.fortysevendeg.log.utils
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkConnection
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaUtilities {
def createKafkaProducer(): Producer[String, String] = {
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
// props.put("partitioner.class", "com.fortysevendeg.biglog.SimplePartitioner")
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
}
def createKafkaConsumer(): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, String](props)
}
def createTopicIntoKafka(topic: String, numPartitions: Int, replicationFactor: Int): Unit = {
val zookeeperConnect = "localhost:2181"
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000
val zkClient = ZkUtils.createZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs)
val zkUtils = new ZkUtils(zkClient, zkConnection = new ZkConnection(zookeeperConnect), isSecure = false)
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, new Properties)
zkClient.close()
}
def d(kafkaProducer: Producer[String, String], topic: String, message: String) = {
kafkaProducer.send(new KeyedMessage[String, String](topic, message))
}
}
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:52,代码来源:KafkaUtilities.scala
示例8: Consumer
//设置package包名称以及导入依赖的类
package co.s4n.infrastructure.kafka
import java.util.concurrent._
import java.util.{ Collections, Properties }
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import scala.collection.JavaConverters._
object Consumer {
def createConsumerConfig(): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProducerExample")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props
}
def run(): Unit = {
val consumer = new KafkaConsumer[String, String](createConsumerConfig())
consumer.subscribe(Collections.singletonList("UsersTopic"))
Executors.newSingleThreadExecutor.execute(() => {
while (true) {
val records = consumer.poll(1000)
for (record <- records.iterator().asScala) {
println("\n\n Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
})
}
}
开发者ID:bazzo03,项目名称:users-api,代码行数:41,代码来源:Consumer.scala
示例9: ConsumerLoop
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.consumer
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.hpi.esb.datavalidator.config.KafkaConsumerConfig
import org.hpi.esb.datavalidator.util.Logging
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
class ConsumerLoop(topic: String, config: KafkaConsumerConfig, results: ListBuffer[ConsumerRecord[String, String]]) extends Runnable with Logging {
private val props = createConsumerProps()
private val consumer = new KafkaConsumer(props)
initializeConsumer()
override def run(): Unit = {
var running = true
var zeroCount = 0
while (running) {
val records = consumer.poll(1000).asInstanceOf[ConsumerRecords[String, String]]
if (records.count() == 0) {
logger.debug(s"Received 0 records from Kafka.")
zeroCount += 1
if (zeroCount == 3) {
logger.debug("Received 0 records from Kafka for the third time. We assume the stream has finished and terminate.")
running = false
}
}
for (record <- records) {
results.append(record)
}
}
consumer.close()
}
private def initializeConsumer(): Unit = {
val topicPartitions = List(new TopicPartition(topic, 0))
consumer.assign(topicPartitions)
consumer.seekToBeginning(topicPartitions)
}
private def createConsumerProps(): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, s"Validator")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.autoCommit)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.autoCommitInterval)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.sessionTimeout)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.keyDeserializerClass)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.valueDeserializerClass)
props
}
}
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:62,代码来源:ConsumerLoop.scala
示例10: TweetConsumer
//设置package包名称以及导入依赖的类
package com.knoldus.kafka
import java.util
import java.util.Properties
import com.knoldus.twitter.Tweet
import com.knoldus.utils.ConfigReader
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._
class TweetConsumer {
def consumeTweets(groupId: String): Unit = {
val kafkaServer = ConfigReader.getKafkaServers
val kafkaTopic = ConfigReader.getKafkaTopic
val properties = new Properties()
properties.put("bootstrap.servers", kafkaServer)
properties.put("group.id", groupId)
properties.put("enable.auto.commit", "true")
properties.put("auto.commit.interval.ms", "1000")
properties.put("auto.offset.reset", "earliest")
properties.put("session.timeout.ms", "30000")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val kafkaConsumer = new KafkaConsumer[String, Tweet](properties)
kafkaConsumer.subscribe(util.Collections.singletonList(kafkaTopic))
while (true) {
val records: ConsumerRecords[String, Tweet] = kafkaConsumer.poll(100)
records.records(kafkaTopic).iterator().toList.foreach { record =>
println(s"Received : ${record.value()}")
}
}
}
}
开发者ID:SangeetaGulia,项目名称:activator-kafka-producer-consumer,代码行数:38,代码来源:TweetConsumer.scala
示例11: Consumer
//设置package包名称以及导入依赖的类
package com.stulsoft.kafka2.consumer
import java.util.{Collections, Properties}
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.KafkaConsumer
object Consumer extends App with LazyLogging {
logger.info("Started consumer")
getMessages()
def getMessages(): Unit = {
val props = new Properties
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
props.put("group.id", "test")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("myClusterTopic"))
while (true) {
val records = consumer.poll(100)
records.forEach(record => {
val resultText = s"""Received message.\n\tPartition = ${record.partition()}, offset is ${record.offset}, topic is "${record.topic()}" key is "${record.key}", value is "${record.value}""""
logger.info(resultText)
})
consumer.commitSync()
}
}
logger.info("Finished consumer")
}
开发者ID:ysden123,项目名称:poc,代码行数:37,代码来源:Consumer.scala
示例12: Consumer
//设置package包名称以及导入依赖的类
package com.stulsoft.consumer
import org.slf4j.{Logger, LoggerFactory}
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
/**
* @author Yuriy Stul.
*/
object Consumer extends App {
val logger: Logger = LoggerFactory.getLogger(Consumer.getClass)
logger.info("Started consumer")
readMessages()
/**
* Reads messages
*/
def readMessages(): Unit = {
val props = new Properties
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
props.put("group.id", "test")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("myClusterTopic"))
while (true) {
val records = consumer.poll(100)
records.forEach(record => {
logger.info(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
})
consumer.commitSync()
}
// consumer.unsubscribe()
// consumer.close()
}
}
开发者ID:ysden123,项目名称:poc,代码行数:43,代码来源:Consumer.scala
示例13: Main
//设置package包名称以及导入依赖的类
package bidding.client.console
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
object Main {
def produce() = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", Int.box(0))
props.put("batch.size", Int.box(16384))
props.put("linger.ms", Int.box(1))
props.put("buffer.memory", Int.box(33554432))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val bidder = new Bidder(producer, "GooglePixelXL4343", 1000)
for (_ <- 1 to 10) bidder.bid()
producer.close()
}
def consume() = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group_1")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList("my-replicated-topic"))
// consumer.seekToBeginning(util.Arrays.asList(new TopicPartition("my-replicated-topic", 0)))
while (true) {
val records = consumer.poll(100)
records.forEach { record =>
println(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
}
}
consumer.close()
}
def main(args: Array[String]): Unit = {
consume()
//produce()
println("--")
}
}
开发者ID:oleksandr-iskhakov,项目名称:bidding-client-console,代码行数:56,代码来源:Main.scala
示例14: KafkaFactorySpec
//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka
import java.util.Properties
import articlestreamer.shared.BaseSpec
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
class KafkaFactorySpec extends BaseSpec {
val factory = new KafkaFactory[String, String]
val serverProps = new Properties()
serverProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
serverProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
serverProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val consumerProps = new Properties()
consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
"Factory" should "provide a producer" in {
val producer = factory.getProducer(serverProps)
producer.close()
producer shouldBe a [KafkaProducer[_, _]]
}
"Factory" should "provide a consumer" in {
val consumer = factory.getConsumer(consumerProps)
consumer.close()
consumer shouldBe a [KafkaConsumer[_, _]]
}
}
开发者ID:firens,项目名称:article-streamer,代码行数:38,代码来源:KafkaFactorySpec.scala
示例15: KafkaEventSourceTest
//设置package包名称以及导入依赖的类
package process
import java.util
import java.util.Collections
import kpi.twitter.analysis.utils.{PredictedStatus, SentimentLabel, TweetSerDe}
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import twitter4j.Status
class KafkaEventSourceTest extends FunSuite with MockitoSugar {
test("subscribe should be invoked once for correct topic") {
val topicName = "fake"
val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
val mockTime = new MockTime
val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
verify(mockConsumer, times(1)).subscribe(Collections.singletonList(topicName))
}
test("poll should return on max records") {
val topicName = "fake"
val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
val mockTime = new MockTime
when(mockConsumer.poll(1000)).thenAnswer(new Answer[ConsumerRecords[SentimentLabel, Status]]() {
override def answer(invocation: InvocationOnMock): ConsumerRecords[SentimentLabel, Status] = {
mockTime.sleep(1)
val tp = new TopicPartition(topicName, 1)
val record = new ConsumerRecord[SentimentLabel, Status](topicName, 0, 0, mock[SentimentLabel], mock[Status])
val recordsMap = new util.HashMap[TopicPartition, util.List[ConsumerRecord[SentimentLabel, Status]]]()
val recordsList = new util.ArrayList[ConsumerRecord[SentimentLabel, Status]]()
recordsList.add(record)
recordsMap.put(tp, recordsList)
new ConsumerRecords[SentimentLabel, Status](recordsMap)
}
})
val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
val records = kafkaEventSource.poll(1000, 1)
assert(1 === records.size)
assert(1 === mockTime.currentMillis)
}
}
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:58,代码来源:KafkaEventSourceTest.scala
示例16: KafkaConsoleConsumer
//设置package包名称以及导入依赖的类
package kpi.twitter.analysis.tools.kafka
import java.util.{Collections, Properties, UUID}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaConsoleConsumer {
def createConsumer(): Consumer[String, String] = {
val consumerProperties = new Properties()
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString)
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = new KafkaConsumer[String, String](consumerProperties, new StringDeserializer, new StringDeserializer)
consumer.subscribe(Collections.singletonList("predicted-sentiment-tweets"))
consumer
}
def main(args: Array[String]) {
val consumer = createConsumer()
while (true) {
val it = consumer.poll(1000).iterator()
// if (!it.hasNext) {
// println("empty :(")
// }
while (it.hasNext) {
val record = it.next()
println(s"${record.key()} : ${record.value()}")
}
}
}
}
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:37,代码来源:KafkaConsoleConsumer.scala
示例17: KafkaStatusActor
//设置package包名称以及导入依赖的类
package org.ardlema.kafka.status
import java.util.Properties
import akka.actor.{ Actor, ActorRef }
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.json4s.jackson.Serialization
import scala.concurrent.duration.FiniteDuration
class KafkaStatusActor(router: ActorRef, delay: FiniteDuration, interval: FiniteDuration) extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
context.system.scheduler.schedule(delay, interval) {
implicit val formats = org.json4s.DefaultFormats
router ! Serialization.write(getKafkaStatus)
}
val kakfaProperties = {
val props = new Properties()
props.put("bootstrap.servers", "127.0.0.1:9092")
props.put("group.id", "kafka-status-group")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props
}
lazy val kafkaConsumer = new KafkaConsumer(kakfaProperties)
override def receive: Actor.Receive = {
case _ ? // just ignore any messages
}
def getStats: Map[String, Long] = {
val baseStats = Map[String, Long](
"count.procs" ? Runtime.getRuntime.availableProcessors(),
"count.mem.free" ? Runtime.getRuntime.freeMemory(),
"count.mem.maxMemory" ? Runtime.getRuntime.maxMemory(),
"count.mem.totalMemory" ? Runtime.getRuntime.totalMemory()
)
baseStats
}
def getKafkaStatus = {
import scala.collection.JavaConversions._
kafkaConsumer.listTopics().map(e ? e._1 ? e._2.size)
//kafkaConsumer.listTopics.mapValues(_.toSet).map(e => ("topic" -> e._1))
}
}
开发者ID:ardlema,项目名称:kafka-watcher,代码行数:51,代码来源:KafkaStatusActor.scala
示例18: KafkaConsumerWrapper
//设置package包名称以及导入依赖的类
package articlestreamer.processor.kafka
import java.util
import java.util.{UUID, Properties}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig, KafkaConsumer}
import scala.collection.JavaConversions._
import scala.concurrent.duration.Duration
class KafkaConsumerWrapper {
private val consumer = new KafkaConsumer[String, String](KafkaConsumerWrapper.properties)
consumer.subscribe(util.Arrays.asList(KafkaConsumerWrapper.topic))
def poll(duration: Duration, count: Int): List[String] = {
println("Polling started.")
val millis = duration.toMillis
val values: List[String] = (1 to count)
.flatMap( _ => {
consumer.poll(millis)
})
.foldLeft(List[String]()) ((values: List[String], record: ConsumerRecord[String, String]) => {
// TODO at the moment elements are added to beginning of list, thus the list is in opposite order
record.value() :: values
})
println("Polling Completed.")
values
}
def stopConsumer() = {
println("Stopping consumer.")
consumer.close()
}
}
object KafkaConsumerWrapper {
private val appConfig = ConfigFactory.load()
val topic = appConfig.getString("kafka.topic")
val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID().toString)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
开发者ID:firens,项目名称:article-streamer-processor,代码行数:62,代码来源:KafkaConsumerWrapper.scala
示例19: SimpleKafkaConsumer
//设置package包名称以及导入依赖的类
import java.util.Properties
import com.fasterxml.jackson.databind.KeyDeserializer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.ConsumerExtensions._
class SimpleKafkaConsumer[K,V](consumerProps : Properties,
topic : String,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V],
function : ConsumerRecords[K, V] => Unit,
poll : Long = 2000) {
private var running = false
private val consumer = new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
private val thread = new Thread {
import scala.collection.JavaConverters._
override def run: Unit = {
consumer.subscribe(List(topic).asJava)
consumer.partitionsFor(topic)
while (running) {
val record: ConsumerRecords[K, V] = consumer.poll(poll)
function(record)
}
}
}
def start(): Unit = {
if(!running) {
running = true
thread.start()
}
}
def stop(): Unit = {
if(running) {
running = false
thread.join()
consumer.close()
}
}
}
开发者ID:zalando-incubator,项目名称:remora,代码行数:50,代码来源:SimpleKafkaConsumer.scala
示例20: Consumer
//设置package包名称以及导入依赖的类
package com.knoldus.consumer
import java.util.{Properties, UUID}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
class Consumer(groupId: String, servers: String, topics: List[String]) {
private val timeout = 10000
val logger = LoggerFactory.getLogger(this.getClass())
private val props: Properties = new Properties
props.put("bootstrap.servers", servers)
props.put("client.id", UUID.randomUUID.toString)
props.put("group.id", groupId)
props.put("key.deserializer", classOf[StringDeserializer].getName)
props.put("value.deserializer", classOf[StringDeserializer].getName)
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(topics)
def read(): List[MessageFromKafka] = {
try {
logger.info("Reading from kafka queue ...... " + topics)
val consumerRecords: ConsumerRecords[String, String] = consumer.poll(timeout)
consumerRecords.map(record => MessageFromKafka(record.value())).toList
}
catch {
case wakeupException: WakeupException => {
logger.error(" Getting WakeupException ", wakeupException)
Nil
}
}
}
def close(): Unit = consumer.close()
}
case class MessageFromKafka(record: String)
开发者ID:knoldus,项目名称:tweet-processing-engine,代码行数:48,代码来源:Consumer.scala
注:本文中的org.apache.kafka.clients.consumer.KafkaConsumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论