• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala KafkaConsumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.consumer.KafkaConsumer的典型用法代码示例。如果您正苦于以下问题:Scala KafkaConsumer类的具体用法?Scala KafkaConsumer怎么用?Scala KafkaConsumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了KafkaConsumer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: SimpleConsumer

//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.core.operations

import java.util.Properties

import com.landoop.kafka.ws.KafkaConstants
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer

object SimpleConsumer {

  // Get a simple string consumer
  def getConsumer[T](kafkaBrokers: String, maxRecords: Option[Int] = None): KafkaConsumer[T, T] = {
    val props = new Properties()
    props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
    props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    maxRecords.find(_ > 0).map { max =>
      props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
    }
    val vProps = new VerifiableProperties(props)

    val consumer = new KafkaConsumer[T, T](props)
    consumer
  }

  def createNewConsumerWithConsumerGroup(kafkaBrokers: String, group: String): KafkaConsumer[String, String] = {
    assert(group.length > 1, "Invalid group length")
    val properties = new Properties()
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
    new KafkaConsumer(properties)
  }


  def getStringAvroConsumer[T](kafkaBrokers: String, schemaRegistry: String, maxRecords: Option[Int] = None): KafkaConsumer[String, T] = {
    val props = new Properties()
    props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
    props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(KafkaConstants.VALUE_DESERIALIZER, classOf[KafkaAvroDeserializer].getCanonicalName)
    props.put(KafkaConstants.SCHEMA_REGISTRY_URL, schemaRegistry)

    maxRecords.find(_ > 0).map { max =>
      props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
    }
    val vProps = new VerifiableProperties(props)

    val consumer = new KafkaConsumer[String, T](props)
    consumer
  }

} 
开发者ID:Landoop,项目名称:kafka-ws,代码行数:58,代码来源:SimpleConsumer.scala


示例2: ProductUpdateCountService

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.productservice

import java.util

import domain.Order
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try


class ProductUpdateCountService(productsService: ProductsService,
                                orderConfirmationService: OrderConfirmationService,
                                kafkaConsumer: KafkaConsumer[String, String],
                                kafkaTopic: String) {

  import com.owlike.genson.defaultGenson._

  private var running = true

  def start() = {
    kafkaConsumer.subscribe(util.Arrays.asList(kafkaTopic))

    Future {
      while (running) {
        val records = kafkaConsumer.poll(100)
        for (record <- records.iterator()) {
          for {
            order <- Try(fromJson[Order](record.value()))
            _ <- processOrder(order)
          } yield Unit
        }
      }
    }
  }

  def processOrder(order: Order): Try[Unit] = Try {
    println(s"Processing order: $order")

    productsService.removeProducts(order.products) match {
      case true => orderConfirmationService.confirm(order.id)
      case false => orderConfirmationService.fail(order.id)
    }

    println(s"Processed order: $order")
  }

  def stop() = {
    running = false
    kafkaConsumer.close()
  }

} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:56,代码来源:ProductUpdateCountService.scala


示例3: OrderProcessingService

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import java.util

import domain.Order
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.JavaConversions._
import scala.util.Try


class OrderProcessingService(orderConsumer: KafkaConsumer[String, String],
                             orderConsumerTopic: String,
                             storeUpdateProducer: KafkaProducer[String, String],
                             storeUpdateTopic: String) {

  import com.owlike.genson.defaultGenson._

  var running = true

  def start() = {
    orderConsumer.subscribe(util.Arrays.asList(orderConsumerTopic))

    while (running) {
      val records = orderConsumer.poll(100)
      records.iterator().foreach(processOrder)
    }
  }

  def processOrder(record: ConsumerRecord[String, String]): Unit = {
    println(s"Processing ${record.value()}")

    for {
      order <- Try(fromJson[Order](record.value()))
      _ <- Try {
        println(s"Sending to store service: $order")
        storeUpdateProducer.send(new ProducerRecord[String, String](storeUpdateTopic, toJson(order)))
      }
    } yield Unit

    println(s"Processing ${record.value()}")

  }

  def stop() = {
    orderConsumer.close()
    running = false
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:51,代码来源:OrderProcessingService.scala


示例4: ConfirmationService

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import java.util

import domain.{OrderStatus, UpdateStoreStatus}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try


class ConfirmationService(confirmationConsumer: KafkaConsumer[String, String],
                          confirmationTopic: String,
                          replyProducer: KafkaProducer[String, String],
                          replyTopic: String) {

  import com.owlike.genson.defaultGenson._

  var running = true

  def start() = {
    confirmationConsumer.subscribe(util.Arrays.asList(confirmationTopic))
    Future {
      while (running) {
        val records = confirmationConsumer.poll(100)
        records.iterator().foreach(processConfirmation)
      }
    }.recover {
      case ex => ex.printStackTrace()
    }
  }

  def processConfirmation(record: ConsumerRecord[String, String]): Unit = {
    println(s"Processing ${record.value()}")

    for {
      status <- Try(fromJson[UpdateStoreStatus](record.value()))
      _ <- Try {
        println(s"Replying $status")
        replyProducer.send(new ProducerRecord(replyTopic, toJson(OrderStatus(status.orderId, status.success))))
      }
    } yield Unit

    println(s"Processed ${record.value()}")
  }

  def stop() = {
    confirmationConsumer.close()
    running = false
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:55,代码来源:ConfirmationService.scala


示例5: OrderApp

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer

import scala.io.StdIn


object OrderApp extends App {

  val confirmationService = new ConfirmationService(
    confirmationConsumer = new KafkaConsumer[String, String](kafka.storeConfirmationConsumer),
    confirmationTopic = "order.confirmation",
    replyProducer = new KafkaProducer[String, String](kafka.producerCfg),
    replyTopic = "api.reply"
  )

  val orderService = new OrderProcessingService(
    new KafkaConsumer[String, String](kafka.orderConsumerCfg),
    "order.order",
    new KafkaProducer[String, String](kafka.producerCfg),
    "store.update"
  )

  confirmationService.start()
  orderService.start()

  StdIn.readLine()
  confirmationService.stop()
  orderService.stop()
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:32,代码来源:OrderApp.scala


示例6: OrderStatusService

//设置package包名称以及导入依赖的类
package com.github.eventdrivenorders.api

import java.util

import domain.OrderStatus
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future


class OrderStatusService(statusConsumer: KafkaConsumer[String, String],
                         statusTopic: String) {

  import com.owlike.genson.defaultGenson._

  private var confirmations: Map[Long, String] = Map()
  private var running = true

  def getStatus(orderId: Long): Option[String] = confirmations.get(orderId)

  def addPending(orderId: Long): Unit = {
    confirmations += (orderId -> "pending")
  }

  def start() = {
    statusConsumer.subscribe(util.Arrays.asList(statusTopic))

    Future {
      while (running) {
        val records = statusConsumer.poll(100)
        for (record <- records.iterator()) {
          println("New status: " + record.value())
          val status = fromJson[OrderStatus](record.value())

          val textStatus = status.success match {
            case true => "success"
            case false => "failed"
          }

          confirmations += (status.orderId -> textStatus)
        }
      }
    }
  }

  def stop() = {
    running = false
    statusConsumer.close()
  }

} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:54,代码来源:OrderStatusService.scala


示例7: KafkaUtilities

//设置package包名称以及导入依赖的类
package com.fortysevendeg.log.utils

import java.util.Properties

import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkConnection
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaUtilities {

  def createKafkaProducer(): Producer[String, String] = {
    val props = new Properties()
    props.put("metadata.broker.list", "localhost:9092")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
//    props.put("partitioner.class", "com.fortysevendeg.biglog.SimplePartitioner")
//    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("producer.type", "async")
    props.put("request.required.acks", "1")

    val config = new ProducerConfig(props)
    new Producer[String, String](config)
  }

  def createKafkaConsumer(): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    new KafkaConsumer[String, String](props)
  }

  def createTopicIntoKafka(topic: String, numPartitions: Int, replicationFactor: Int): Unit = {
    val zookeeperConnect = "localhost:2181"
    val sessionTimeoutMs = 10 * 1000
    val connectionTimeoutMs = 8 * 1000

    val zkClient = ZkUtils.createZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs)
    val zkUtils = new ZkUtils(zkClient, zkConnection = new ZkConnection(zookeeperConnect), isSecure = false)
    AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, new Properties)
    zkClient.close()
  }

  def d(kafkaProducer: Producer[String, String], topic: String, message: String) = {
    kafkaProducer.send(new KeyedMessage[String, String](topic, message))
  }

} 
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:52,代码来源:KafkaUtilities.scala


示例8: Consumer

//设置package包名称以及导入依赖的类
package co.s4n.infrastructure.kafka

import java.util.concurrent._
import java.util.{ Collections, Properties }

import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }

import scala.collection.JavaConverters._


object Consumer {

  def createConsumerConfig(): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProducerExample")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run(): Unit = {
    val consumer = new KafkaConsumer[String, String](createConsumerConfig())
    consumer.subscribe(Collections.singletonList("UsersTopic"))

    Executors.newSingleThreadExecutor.execute(() => {
      while (true) {
        val records = consumer.poll(1000)

        for (record <- records.iterator().asScala) {
          println("\n\n Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
        }
      }
    })
  }

} 
开发者ID:bazzo03,项目名称:users-api,代码行数:41,代码来源:Consumer.scala


示例9: ConsumerLoop

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.consumer

import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.hpi.esb.datavalidator.config.KafkaConsumerConfig
import org.hpi.esb.datavalidator.util.Logging

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

class ConsumerLoop(topic: String, config: KafkaConsumerConfig, results: ListBuffer[ConsumerRecord[String, String]]) extends Runnable with Logging {

  private val props = createConsumerProps()
  private val consumer = new KafkaConsumer(props)

  initializeConsumer()

  override def run(): Unit = {

    var running = true
    var zeroCount = 0

    while (running) {
      val records = consumer.poll(1000).asInstanceOf[ConsumerRecords[String, String]]

      if (records.count() == 0) {
        logger.debug(s"Received 0 records from Kafka.")
        zeroCount += 1
        if (zeroCount == 3) {
          logger.debug("Received 0 records from Kafka for the third time. We assume the stream has finished and terminate.")
          running = false
        }
      }

      for (record <- records) {
        results.append(record)
      }
    }
    consumer.close()
  }

  private def initializeConsumer(): Unit = {
    val topicPartitions = List(new TopicPartition(topic, 0))
    consumer.assign(topicPartitions)
    consumer.seekToBeginning(topicPartitions)
  }

  private def createConsumerProps(): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, s"Validator")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.autoCommit)
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.autoCommitInterval)
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.sessionTimeout)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.keyDeserializerClass)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.valueDeserializerClass)
    props
  }
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:62,代码来源:ConsumerLoop.scala


示例10: TweetConsumer

//设置package包名称以及导入依赖的类
package com.knoldus.kafka

import java.util
import java.util.Properties

import com.knoldus.twitter.Tweet
import com.knoldus.utils.ConfigReader
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._

class TweetConsumer {

  def consumeTweets(groupId: String): Unit = {
    val kafkaServer = ConfigReader.getKafkaServers
    val kafkaTopic = ConfigReader.getKafkaTopic

    val properties = new Properties()
    properties.put("bootstrap.servers", kafkaServer)
    properties.put("group.id", groupId)
    properties.put("enable.auto.commit", "true")
    properties.put("auto.commit.interval.ms", "1000")
    properties.put("auto.offset.reset", "earliest")
    properties.put("session.timeout.ms", "30000")
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    val kafkaConsumer = new KafkaConsumer[String, Tweet](properties)
    kafkaConsumer.subscribe(util.Collections.singletonList(kafkaTopic))

    while (true) {
      val records: ConsumerRecords[String, Tweet] = kafkaConsumer.poll(100)
      records.records(kafkaTopic).iterator().toList.foreach { record =>
        println(s"Received : ${record.value()}")
      }
    }
  }
} 
开发者ID:SangeetaGulia,项目名称:activator-kafka-producer-consumer,代码行数:38,代码来源:TweetConsumer.scala


示例11: Consumer

//设置package包名称以及导入依赖的类
package com.stulsoft.kafka2.consumer

import java.util.{Collections, Properties}

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.KafkaConsumer


object Consumer extends App with LazyLogging {
  logger.info("Started consumer")

  getMessages()

  def getMessages(): Unit = {
    val props = new Properties
    props.put("bootstrap.servers", "localhost:9092,localhost:9093")
    props.put("group.id", "test")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("myClusterTopic"))
    while (true) {
      val records = consumer.poll(100)
      records.forEach(record => {
        val resultText = s"""Received message.\n\tPartition = ${record.partition()}, offset is ${record.offset}, topic is "${record.topic()}" key is "${record.key}", value is "${record.value}""""
        logger.info(resultText)
      })
      consumer.commitSync()
    }
  }

  logger.info("Finished consumer")
} 
开发者ID:ysden123,项目名称:poc,代码行数:37,代码来源:Consumer.scala


示例12: Consumer

//设置package包名称以及导入依赖的类
package com.stulsoft.consumer

import org.slf4j.{Logger, LoggerFactory}
import java.util.{Collections, Properties}

import org.apache.kafka.clients.consumer.KafkaConsumer

/**
  * @author Yuriy Stul.
  */
object Consumer extends App {
  val logger: Logger = LoggerFactory.getLogger(Consumer.getClass)
  logger.info("Started consumer")
  readMessages()

  /**
    * Reads messages
    */
  def readMessages(): Unit = {

    val props = new Properties
    props.put("bootstrap.servers", "localhost:9092,localhost:9093")
    props.put("group.id", "test")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("myClusterTopic"))
    while (true) {
      val records = consumer.poll(100)
      records.forEach(record => {
        logger.info(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
      })
      consumer.commitSync()
    }

    //    consumer.unsubscribe()
    //    consumer.close()
  }
} 
开发者ID:ysden123,项目名称:poc,代码行数:43,代码来源:Consumer.scala


示例13: Main

//设置package包名称以及导入依赖的类
package bidding.client.console

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer


object Main {
  def produce() = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("acks", "all")
    props.put("retries", Int.box(0))
    props.put("batch.size", Int.box(16384))
    props.put("linger.ms", Int.box(1))
    props.put("buffer.memory", Int.box(33554432))
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val bidder = new Bidder(producer, "GooglePixelXL4343", 1000)
    for (_ <- 1 to 10) bidder.bid()
    producer.close()
  }

  def consume() = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("group.id", "group_1")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList("my-replicated-topic"))
//    consumer.seekToBeginning(util.Arrays.asList(new TopicPartition("my-replicated-topic", 0)))
    while (true) {
      val records = consumer.poll(100)
      records.forEach { record =>
        println(s"offset = ${record.offset}, key = ${record.key}, value = ${record.value}")
      }
    }

    consumer.close()
  }

  def main(args: Array[String]): Unit = {
    consume()
    //produce()
    println("--")
  }
} 
开发者ID:oleksandr-iskhakov,项目名称:bidding-client-console,代码行数:56,代码来源:Main.scala


示例14: KafkaFactorySpec

//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka

import java.util.Properties

import articlestreamer.shared.BaseSpec
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}


class KafkaFactorySpec extends BaseSpec {

  val factory = new KafkaFactory[String, String]

  val serverProps = new Properties()
  serverProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
  serverProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  serverProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

  val consumerProps = new Properties()
  consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
  consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

  "Factory" should "provide a producer" in {
    val producer = factory.getProducer(serverProps)
    producer.close()
    producer shouldBe a [KafkaProducer[_, _]]
  }

  "Factory" should "provide a consumer" in {
    val consumer = factory.getConsumer(consumerProps)
    consumer.close()
    consumer shouldBe a [KafkaConsumer[_, _]]
  }

} 
开发者ID:firens,项目名称:article-streamer,代码行数:38,代码来源:KafkaFactorySpec.scala


示例15: KafkaEventSourceTest

//设置package包名称以及导入依赖的类
package process

import java.util
import java.util.Collections

import kpi.twitter.analysis.utils.{PredictedStatus, SentimentLabel, TweetSerDe}
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import twitter4j.Status


class KafkaEventSourceTest extends FunSuite with MockitoSugar {


  test("subscribe should be invoked once for correct topic") {
    val topicName = "fake"
    val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
    val mockTime = new MockTime

    val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
    verify(mockConsumer, times(1)).subscribe(Collections.singletonList(topicName))
  }

  
  test("poll should return on max records") {

    val topicName = "fake"
    val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
    val mockTime = new MockTime

    when(mockConsumer.poll(1000)).thenAnswer(new Answer[ConsumerRecords[SentimentLabel, Status]]() {
      override def answer(invocation: InvocationOnMock): ConsumerRecords[SentimentLabel, Status] = {
        mockTime.sleep(1)
        val tp = new TopicPartition(topicName, 1)
        val record = new ConsumerRecord[SentimentLabel, Status](topicName, 0, 0, mock[SentimentLabel], mock[Status])
        val recordsMap = new util.HashMap[TopicPartition, util.List[ConsumerRecord[SentimentLabel, Status]]]()
        val recordsList = new util.ArrayList[ConsumerRecord[SentimentLabel, Status]]()
        recordsList.add(record)
        recordsMap.put(tp, recordsList)
        new ConsumerRecords[SentimentLabel, Status](recordsMap)

      }
    })

    val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)

    val records = kafkaEventSource.poll(1000, 1)

    assert(1 === records.size)
    assert(1 === mockTime.currentMillis)
  }
} 
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:58,代码来源:KafkaEventSourceTest.scala


示例16: KafkaConsoleConsumer

//设置package包名称以及导入依赖的类
package kpi.twitter.analysis.tools.kafka

import java.util.{Collections, Properties, UUID}

import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaConsoleConsumer {

  def createConsumer(): Consumer[String, String] = {
    val consumerProperties = new Properties()
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString)
    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

    val consumer = new KafkaConsumer[String, String](consumerProperties, new StringDeserializer, new StringDeserializer)

    consumer.subscribe(Collections.singletonList("predicted-sentiment-tweets"))
    consumer
  }

  def main(args: Array[String]) {
    val consumer = createConsumer()

    while (true) {
      val it = consumer.poll(1000).iterator()
//      if (!it.hasNext) {
//        println("empty :(")
//      }
      while (it.hasNext) {
        val record = it.next()
        println(s"${record.key()} : ${record.value()}")
      }
    }
  }
} 
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:37,代码来源:KafkaConsoleConsumer.scala


示例17: KafkaStatusActor

//设置package包名称以及导入依赖的类
package org.ardlema.kafka.status

import java.util.Properties

import akka.actor.{ Actor, ActorRef }
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.json4s.jackson.Serialization
import scala.concurrent.duration.FiniteDuration

class KafkaStatusActor(router: ActorRef, delay: FiniteDuration, interval: FiniteDuration) extends Actor {

  import scala.concurrent.ExecutionContext.Implicits.global

  context.system.scheduler.schedule(delay, interval) {
    implicit val formats = org.json4s.DefaultFormats
    router ! Serialization.write(getKafkaStatus)
  }

  val kakfaProperties = {
    val props = new Properties()
    props.put("bootstrap.servers", "127.0.0.1:9092")
    props.put("group.id", "kafka-status-group")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  lazy val kafkaConsumer = new KafkaConsumer(kakfaProperties)

  override def receive: Actor.Receive = {
    case _ ? // just ignore any messages
  }

  def getStats: Map[String, Long] = {

    val baseStats = Map[String, Long](
      "count.procs" ? Runtime.getRuntime.availableProcessors(),
      "count.mem.free" ? Runtime.getRuntime.freeMemory(),
      "count.mem.maxMemory" ? Runtime.getRuntime.maxMemory(),
      "count.mem.totalMemory" ? Runtime.getRuntime.totalMemory()
    )
    baseStats
  }

  def getKafkaStatus = {
    import scala.collection.JavaConversions._
    kafkaConsumer.listTopics().map(e ? e._1 ? e._2.size)
    //kafkaConsumer.listTopics.mapValues(_.toSet).map(e => ("topic" -> e._1))
  }
} 
开发者ID:ardlema,项目名称:kafka-watcher,代码行数:51,代码来源:KafkaStatusActor.scala


示例18: KafkaConsumerWrapper

//设置package包名称以及导入依赖的类
package articlestreamer.processor.kafka

import java.util
import java.util.{UUID, Properties}

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig, KafkaConsumer}

import scala.collection.JavaConversions._
import scala.concurrent.duration.Duration

class KafkaConsumerWrapper {

  private val consumer = new KafkaConsumer[String, String](KafkaConsumerWrapper.properties)

  consumer.subscribe(util.Arrays.asList(KafkaConsumerWrapper.topic))

  def poll(duration: Duration, count: Int): List[String] = {

    println("Polling started.")

    val millis = duration.toMillis

    val values: List[String] = (1 to count)
      .flatMap( _ => {
        consumer.poll(millis)
      })
      .foldLeft(List[String]()) ((values: List[String], record: ConsumerRecord[String, String]) => {
        // TODO at the moment elements are added to beginning of list, thus the list is in opposite order
        record.value() :: values
      })

    println("Polling Completed.")

    values
  }

  def stopConsumer() = {
    println("Stopping consumer.")
    consumer.close()
  }

}

object KafkaConsumerWrapper {

  private val appConfig = ConfigFactory.load()
  val topic = appConfig.getString("kafka.topic")

  val properties = new Properties()
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID().toString)
  properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
  properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000")
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

} 
开发者ID:firens,项目名称:article-streamer-processor,代码行数:62,代码来源:KafkaConsumerWrapper.scala


示例19: SimpleKafkaConsumer

//设置package包名称以及导入依赖的类
import java.util.Properties

import com.fasterxml.jackson.databind.KeyDeserializer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.ConsumerExtensions._

class SimpleKafkaConsumer[K,V](consumerProps : Properties,
                               topic : String,
                               keyDeserializer: Deserializer[K],
                               valueDeserializer: Deserializer[V],
                               function : ConsumerRecords[K, V] => Unit,
                               poll : Long = 2000) {

  private var running = false

  private val consumer = new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)


  private val thread = new Thread {
    import scala.collection.JavaConverters._

    override def run: Unit = {
      consumer.subscribe(List(topic).asJava)
      consumer.partitionsFor(topic)

      while (running) {
        val record: ConsumerRecords[K, V] = consumer.poll(poll)
        function(record)
      }
    }
  }

  def start(): Unit = {
    if(!running) {
      running = true
      thread.start()
    }
  }

  def stop(): Unit = {
    if(running) {
      running = false
      thread.join()
      consumer.close()
    }
  }
} 
开发者ID:zalando-incubator,项目名称:remora,代码行数:50,代码来源:SimpleKafkaConsumer.scala


示例20: Consumer

//设置package包名称以及导入依赖的类
package com.knoldus.consumer

import java.util.{Properties, UUID}

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._


class Consumer(groupId: String, servers: String, topics: List[String]) {

  private val timeout = 10000

  val logger = LoggerFactory.getLogger(this.getClass())

  private val props: Properties = new Properties
  props.put("bootstrap.servers", servers)
  props.put("client.id", UUID.randomUUID.toString)
  props.put("group.id", groupId)
  props.put("key.deserializer", classOf[StringDeserializer].getName)
  props.put("value.deserializer", classOf[StringDeserializer].getName)

  private val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(topics)

  def read(): List[MessageFromKafka] = {
    try {
      logger.info("Reading from kafka queue ...... " + topics)
      val consumerRecords: ConsumerRecords[String, String] = consumer.poll(timeout)
      consumerRecords.map(record => MessageFromKafka(record.value())).toList
    }
    catch {
      case wakeupException: WakeupException => {
        logger.error(" Getting WakeupException ", wakeupException)
        Nil
      }
    }
  }

  def close(): Unit = consumer.close()

}

case class MessageFromKafka(record: String) 
开发者ID:knoldus,项目名称:tweet-processing-engine,代码行数:48,代码来源:Consumer.scala



注:本文中的org.apache.kafka.clients.consumer.KafkaConsumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala UnionRDD类代码示例发布时间:2022-05-23
下一篇:
Scala IntWritable类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap