• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ConsumerRecord类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.consumer.ConsumerRecord的典型用法代码示例。如果您正苦于以下问题:Scala ConsumerRecord类的具体用法?Scala ConsumerRecord怎么用?Scala ConsumerRecord使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ConsumerRecord类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: KafkaUtility

//设置package包名称以及导入依赖的类
package com.knoldus.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object KafkaUtility {

  //TODO It should read from config
  private val kafkaParams = Map(
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "earliest",
    "group.id" -> "tweet-consumer"
  )

  private val preferredHosts = LocationStrategies.PreferConsistent


  def createDStreamFromKafka(ssc: StreamingContext, topics: List[String]): InputDStream[ConsumerRecord[String, String]] =
    KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

} 
开发者ID:knoldus,项目名称:real-time-stream-processing-engine,代码行数:32,代码来源:KafkaUtility.scala


示例3: PacketConsumer

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.kafka.producer.KafkaKey
import edu.uw.at.iroberts.wirefugue.kafka.serdes.{PacketDeserializer, PacketSerde}
import edu.uw.at.iroberts.wirefugue.pcap.Packet
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.IntegerDeserializer

import scala.concurrent.Await
import scala.concurrent.duration._


object PacketConsumer extends App {
  type PacketRecord = ConsumerRecord[KafkaKey, Array[Byte]]
  val config = ConfigFactory.load("application.conf")

  implicit val system = ActorSystem("stream-consumer-system", config)
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new IntegerDeserializer, new PacketDeserializer)
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  // Separate streams for each partition
  val maxPartitions = 100
  val consumerGroup = Consumer.plainPartitionedSource(consumerSettings, Subscriptions.topics("packets"))

  val done = consumerGroup.map {
    case (topicPartition, source) =>
      val p: Int = topicPartition.partition
      source
        .map { (cr: ConsumerRecord[Integer, Packet]) => cr.value() }
        .filter(_.ip.isDefined)
        .toMat(Sink.foreach(packet => println(s"[$p] $packet")))(Keep.both)
        .run()
  }
    .mapAsyncUnordered(maxPartitions)(_._2)
    .runWith(Sink.ignore)

  Await.result(done, Duration.Inf)

  system.terminate()
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:50,代码来源:PacketConsumer.scala


示例4: StreamConsumer

//设置package包名称以及导入依赖的类
package consumers

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import cats.data.Xor
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import io.circe._
import io.circe.generic.auto._
import cats.data.Xor.{Left, Right}
import model.Employee

import scala.concurrent.Future

object StreamConsumer  extends App{
  implicit val actorSystem = ActorSystem("consumer-actors", ConfigFactory.load())
  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))

  lazy val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group13")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")//"latest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

  lazy val subscription = Subscriptions.topics("raw-data-1")
  lazy val db = new Processor()
  Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(4){
        db.processMessage
      }
    .runWith(Sink.ignore)

}

class Processor {
  def processMessage(record: ConsumerRecord[Array[Byte], String]): Future[Done] ={
    println(s"DB.save: ${record.value()}")

    Option(record.value()).foreach{ jsonString =>
      val mayBeEmp: Xor[Error, Employee] = jawn.decode[Employee](jsonString)
      mayBeEmp match {
        case Left(error) => println(error)
        case Right(emp) => println(s"employee name: ${emp.name}")
      }
    }
    Future.successful(Done)  }
} 
开发者ID:ajit-scala,项目名称:kafka-consumers,代码行数:55,代码来源:StreamConsumer.scala


示例5: TopicHandler

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement

case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])

object TopicHandler {

  def create(topicName: String, system: ActorSystem): TopicHandler = {

    val uuid = java.util.UUID.randomUUID.toString
    val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
      .withGroupId(uuid)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")

    val partition = 0
    val topicSource = createSource(consumerSettings, topicName, partition)
    val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)

    new TopicHandler(topicName, numberOfMessages, topicSource)
  }

  def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {

    val subscription = Subscriptions.assignmentWithOffset(
      new TopicPartition(topicName, partition) -> 0L
    )
    Consumer.plainSource(consumerSettings, subscription)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala


示例6: OrderProcessingService

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import java.util

import domain.Order
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.JavaConversions._
import scala.util.Try


class OrderProcessingService(orderConsumer: KafkaConsumer[String, String],
                             orderConsumerTopic: String,
                             storeUpdateProducer: KafkaProducer[String, String],
                             storeUpdateTopic: String) {

  import com.owlike.genson.defaultGenson._

  var running = true

  def start() = {
    orderConsumer.subscribe(util.Arrays.asList(orderConsumerTopic))

    while (running) {
      val records = orderConsumer.poll(100)
      records.iterator().foreach(processOrder)
    }
  }

  def processOrder(record: ConsumerRecord[String, String]): Unit = {
    println(s"Processing ${record.value()}")

    for {
      order <- Try(fromJson[Order](record.value()))
      _ <- Try {
        println(s"Sending to store service: $order")
        storeUpdateProducer.send(new ProducerRecord[String, String](storeUpdateTopic, toJson(order)))
      }
    } yield Unit

    println(s"Processing ${record.value()}")

  }

  def stop() = {
    orderConsumer.close()
    running = false
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:51,代码来源:OrderProcessingService.scala


示例7: ConfirmationService

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import java.util

import domain.{OrderStatus, UpdateStoreStatus}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try


class ConfirmationService(confirmationConsumer: KafkaConsumer[String, String],
                          confirmationTopic: String,
                          replyProducer: KafkaProducer[String, String],
                          replyTopic: String) {

  import com.owlike.genson.defaultGenson._

  var running = true

  def start() = {
    confirmationConsumer.subscribe(util.Arrays.asList(confirmationTopic))
    Future {
      while (running) {
        val records = confirmationConsumer.poll(100)
        records.iterator().foreach(processConfirmation)
      }
    }.recover {
      case ex => ex.printStackTrace()
    }
  }

  def processConfirmation(record: ConsumerRecord[String, String]): Unit = {
    println(s"Processing ${record.value()}")

    for {
      status <- Try(fromJson[UpdateStoreStatus](record.value()))
      _ <- Try {
        println(s"Replying $status")
        replyProducer.send(new ProducerRecord(replyTopic, toJson(OrderStatus(status.orderId, status.success))))
      }
    } yield Unit

    println(s"Processed ${record.value()}")
  }

  def stop() = {
    confirmationConsumer.close()
    running = false
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:55,代码来源:ConfirmationService.scala


示例8: RsvpStreaming

//设置package包名称以及导入依赖的类
package com.github.mmolimar.asks.streaming

import java.util.UUID

import com.github.mmolimar.askss.common.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object RsvpStreaming extends App with LazyLogging {

  val filter = config.getString("spark.filter").toLowerCase.split(",").toList
  val ssc = new StreamingContext(buildSparkConfig, Seconds(5))

  //TODO
  kafkaStream(ssc)
    .map(_.value())
    .map(_.toEvent)
    .filter(rsvp => {
      filter.exists(rsvp.event.get.event_name.contains(_))
    })
    .print()

  ssc.start()
  ssc.awaitTermination()

  def buildSparkConfig(): SparkConf = {
    new SparkConf()
      .setMaster(config.getString("spark.master"))
      .setAppName("RsvpStreaming")
      .set("spark.streaming.ui.retainedBatches", "5")
      .set("spark.streaming.backpressure.enabled", "true")
      .set("spark.sql.parquet.compression.codec", "snappy")
      .set("spark.sql.parquet.mergeSchema", "true")
      .set("spark.sql.parquet.binaryAsString", "true")
  }

  def kafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val topics = Set(config.getString("kafka.topic"))

    val kafkaParams = Map[String, Object](
      "metadata.broker.list" -> config.getString("kafka.brokerList"),
      "enable.auto.commit" -> config.getBoolean("kafka.autoCommit").toString,
      "auto.offset.reset" -> config.getString("kafka.autoOffset"),
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> config.getString("kafka.brokerList"),
      ConsumerConfig.GROUP_ID_CONFIG -> s"consumer-${UUID.randomUUID}",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )

    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
  }

} 
开发者ID:mmolimar,项目名称:askss,代码行数:61,代码来源:RsvpStreaming.scala


示例9: ConsumerLoop

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.consumer

import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.hpi.esb.datavalidator.config.KafkaConsumerConfig
import org.hpi.esb.datavalidator.util.Logging

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

class ConsumerLoop(topic: String, config: KafkaConsumerConfig, results: ListBuffer[ConsumerRecord[String, String]]) extends Runnable with Logging {

  private val props = createConsumerProps()
  private val consumer = new KafkaConsumer(props)

  initializeConsumer()

  override def run(): Unit = {

    var running = true
    var zeroCount = 0

    while (running) {
      val records = consumer.poll(1000).asInstanceOf[ConsumerRecords[String, String]]

      if (records.count() == 0) {
        logger.debug(s"Received 0 records from Kafka.")
        zeroCount += 1
        if (zeroCount == 3) {
          logger.debug("Received 0 records from Kafka for the third time. We assume the stream has finished and terminate.")
          running = false
        }
      }

      for (record <- records) {
        results.append(record)
      }
    }
    consumer.close()
  }

  private def initializeConsumer(): Unit = {
    val topicPartitions = List(new TopicPartition(topic, 0))
    consumer.assign(topicPartitions)
    consumer.seekToBeginning(topicPartitions)
  }

  private def createConsumerProps(): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, s"Validator")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.autoCommit)
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.autoCommitInterval)
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.sessionTimeout)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.keyDeserializerClass)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.valueDeserializerClass)
    props
  }
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:62,代码来源:ConsumerLoop.scala


示例10: IdentityValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.hpi.esb.datavalidator.config.Configurable
import org.hpi.esb.datavalidator.util.Logging

import scala.collection.mutable.ListBuffer


class IdentityValidation(inRecords: ListBuffer[ConsumerRecord[String, String]],
                         resultRecords: ListBuffer[ConsumerRecord[String, String]])
  extends ResultValidation(inRecords, resultRecords) with Configurable with Logging {

  override def fulfillsRequirements(): Boolean = {

    if (inRecords.size != resultRecords.size) {
      logger.info(s"Invalid identity query result. Expected 'OUT' size: ${inRecords.size} Actual: ${resultRecords.size}")
      false
    }
    else {

      inRecords.zip(resultRecords)
        .foreach { case (r1, r2) =>
          if (r1.value() != r2.value()) {
            logger.info(s"Invalid identity query result: Expected value: ${r1.value()} but found value: ${r2.value()}.")
            return false
          }
        }
      logger.info(s"Valid identity query results. Both topics have size ${inRecords.size}.")
      true
    }
  }
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:34,代码来源:IdentityValidation.scala


示例11: ResultValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.mutable.ListBuffer
import org.hpi.esb.datavalidator.util.Logging

abstract class ResultValidation(inRecords: ListBuffer[ConsumerRecord[String, String]],
                                resultRecords: ListBuffer[ConsumerRecord[String, String]]) extends Logging {

  protected def recordsAreIncomplete(): Boolean = {
    if (inRecords.isEmpty) {
      logger.info(s"'In' records list is empty. Can not validate correctly.")
      return true
    }
    if (resultRecords.isEmpty) {
      logger.info(s"'Results' records list is empty. Can not validate correctly.")
      return true
    }
    false
  }

  def fulfillsRequirements(): Boolean
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:24,代码来源:ResultValidation.scala


示例12: createConsumerRecordList

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.record.TimestampType

import scala.collection.mutable.ListBuffer

trait ValidationTestHelpers {

  // create a list of ConsumerRecord objects
  def createConsumerRecordList(topic: String, values: List[(Long, String)]): ListBuffer[ConsumerRecord[String, String]] = {
    val l = new ListBuffer[ConsumerRecord[String, String]]()
    values.foreach { case (timestamp, value) => l.append(createConsumerRecord(topic, timestamp, value)) }
    l
  }

  def createConsumerRecord(topic: String, timestamp: Long, value: String): ConsumerRecord[String, String] = {
    val partition = 0
    val offset = 0
    val checksum = 0
    val serializedKeySize = 0
    val serializedValueSize = 0
    val key = "0"

    new ConsumerRecord[String, String](topic, partition, offset, timestamp, TimestampType.CREATE_TIME, checksum, serializedKeySize, serializedValueSize, key, value)
  }
} 
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:28,代码来源:ValidationTestHelpers.scala


示例13: PlainSourceConsumerMain

//设置package包名称以及导入依赖的类
package com.example.consumer

import java.util.concurrent.atomic.AtomicLong

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future


object PlainSourceConsumerMain extends App {
  implicit val system = ActorSystem("PlainSourceConsumerMain")
  implicit val materializer = ActorMaterializer()

  //TODO: move to configuration application.conf
  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("PlainSourceConsumer")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val db = new DB
  db.loadOffset().foreach { fromOffset =>
    val partition = 0
    val subscription = Subscriptions.assignmentWithOffset(
      new TopicPartition("topic1", partition) -> fromOffset
    )
    val done =
      Consumer.plainSource(consumerSettings, subscription)
        .mapAsync(1)(db.save)
        .runWith(Sink.ignore)
  }

}

//External Offset Storage
class DB {

  private val offset = new AtomicLong(2)

  def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = {
    println(s"DB.save: ${record.value}")
    offset.set(record.offset)
    Future.successful(Done)
  }

  def loadOffset(): Future[Long] =
    Future.successful(offset.get)

  def update(data: String): Future[Done] = {
    println(s"DB.update: $data")
    Future.successful(Done)
  }
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:62,代码来源:PlainSourceConsumerMain.scala


示例14: KafkaEngine

//设置package包名称以及导入依赖的类
package com.lljv.analytics.hadoopengine

import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.control.NonFatal


class KafkaEngine(val settings: KafkaSettings) extends Serializable {

  var producer: Option[KafkaProducer[String, String]] = None


  def getStreamingParameters(): Map[String, String] = {
    val parameters = Map[String, String](
      "metadata.broker.list" -> settings.kafkaBroker,
      "bootstrap.servers" -> settings.kafkaBroker,
      "key.serializer" -> settings.stringSerializerType,
      "value.serializer" -> settings.stringSerializerType,
      "key.deserializer" -> settings.stringDeserializerType,
      "value.deserializer" -> settings.stringDeserializerType,
      "group.id" -> settings.consumerGroupId
    )
    return parameters
  }

  def startStream(
                   topicName: String,
                   sparkEngine: SparkStreamEngine
                 ): Option[InputDStream[ConsumerRecord[String, String]]] =
  {
    val stream: Option[InputDStream[ConsumerRecord[String, String]]] = try {
      Some(KafkaUtils.createDirectStream[String,String](
        sparkEngine.streamingContext.get,
        PreferConsistent,
        Subscribe[String, String](Array(topicName), this.getStreamingParameters())
      ))
    } catch {
      case NonFatal(exc) => {
        // printf(exc.getMessage())
        // TODO: logging
        None
      }
    }
    return stream
  }

  

} 
开发者ID:dotdeb,项目名称:Science-Finder,代码行数:56,代码来源:KafkaEngine.scala


示例15: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala


示例16: Module

//设置package包名称以及导入依赖的类
package com.github.dnvriend

import com.google.inject.{AbstractModule, Provides}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.springframework.kafka.core.{DefaultKafkaConsumerFactory, DefaultKafkaProducerFactory, KafkaTemplate}
import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.kafka.listener.{KafkaMessageListenerContainer, MessageListener}

import scala.collection.JavaConversions._

class Module extends AbstractModule {
  protected def configure(): Unit = {
  }

  @Provides
  def createProducerTemplate: KafkaTemplate[String, String] = {
    val senderProps: java.util.Map[String, Any] = Map(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ProducerConfig.RETRIES_CONFIG -> 0,
      ProducerConfig.BATCH_SIZE_CONFIG -> 16384,
      ProducerConfig.LINGER_MS_CONFIG -> 1,
      ProducerConfig.BUFFER_MEMORY_CONFIG -> 33554432,
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer],
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer]
    )
    val producerFactory = new DefaultKafkaProducerFactory[String, String](senderProps.mapValues(_.asInstanceOf[AnyRef]))
    new KafkaTemplate[String, String](producerFactory)
  }

  @Provides
  def createKafkaMessageListenerContainer(messageListener: MessageListener[String, String]): KafkaMessageListenerContainer[String, String] = {
    val consumerProps: java.util.Map[String, Any] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "group",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> true,
      ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "100",
      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "15000",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val containerProperties = new ContainerProperties("test")
    containerProperties.setMessageListener(messageListener)

    val consumerFactory = new DefaultKafkaConsumerFactory[String, String](consumerProps.mapValues(_.asInstanceOf[AnyRef]))
    val container = new KafkaMessageListenerContainer[String, String](consumerFactory, containerProperties)
    container.setBeanName("testAuto")
    container.start()
    container
  }

  @Provides
  def messageListener: MessageListener[String, String] = new MessageListener[String, String] {
    override def onMessage(message: ConsumerRecord[String, String]): Unit = {
      println(s"received: $message")
    }
  }
} 
开发者ID:dnvriend,项目名称:spring-kafka-test,代码行数:61,代码来源:Module.scala


示例17: KafkaSourceAdapter

//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.driver.adapter

import akka.util.ByteString
import com.thenetcircle.event_dispatcher.RawEvent
import com.thenetcircle.event_dispatcher.driver.{ KafkaKey, KafkaValue }
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord

object KafkaSourceAdapter extends SourceAdapter[ConsumerRecord[KafkaKey, KafkaValue]] {
  override def fit(message: ConsumerRecord[KafkaKey, KafkaValue]): RawEvent =
    RawEvent(
      ByteString(message.value()),
      message.topic(),
      Map("key" -> ByteString(message.key()),
          "offset" -> message.offset(),
          "timestamp" -> message.timestamp(),
          "partition" -> message.partition())
    )
}

object KafkaSinkAdapter extends SinkAdapter[ProducerRecord[KafkaKey, KafkaValue]] {
  override def unfit(event: RawEvent): ProducerRecord[KafkaKey, KafkaValue] = {
    val context = event.context

    val topic = event.channel
    val value = event.body.toArray
    val partition = context.get("partition") match {
      case Some(p: Int) => p
      case _ => null
    }
    val timestamp = context.get("timestamp") match {
      case Some(t: Long) => t
      case _ => null
    }
    val key = context.get("key") match {
      case Some(k: ByteString) => k.toArray
      case Some(k: String) => k.getBytes("UTF-8")
      case Some(k: KafkaKey) => k
      case _ => null
    }

    new ProducerRecord[KafkaKey, KafkaValue](topic,
                                             partition.asInstanceOf[java.lang.Integer],
                                             timestamp.asInstanceOf[java.lang.Long],
                                             key,
                                             value)
  }

} 
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:50,代码来源:KafkaAdapter.scala


示例18: KafkaAdapterTest

//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.driver.adapter

import akka.util.ByteString
import com.thenetcircle.event_dispatcher.driver.{ KafkaKey, KafkaValue }
import com.thenetcircle.event_dispatcher.{ RawEvent, TestCase }
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord

class KafkaAdapterTest extends TestCase {

  val pkey = "test-key".getBytes("UTF-8")
  val pval = "test-data".getBytes("UTF-8")
  val rawEvent = RawEvent(
    ByteString(pval),
    "test-channel",
    Map(
      "key" -> ByteString(pkey),
      "partition" -> 1,
      "offset" -> 10,
      "timestamp" -> -1
    )
  )

  test("kafka source adapter") {

    val adapter = KafkaSourceAdapter
    val message =
      new ConsumerRecord[KafkaKey, KafkaValue]("test-channel", 1, 10, pkey, pval)

    adapter.fit(message) should equal(rawEvent)

  }

  test("kafka sink adapter") {

    val adapter = KafkaSinkAdapter
    val actual = adapter.unfit(rawEvent)
    val expected = new ProducerRecord[KafkaKey, KafkaValue](
      "test-channel",
      1,
      null,
      pkey,
      pval
    )

    actual.topic() shouldEqual expected.topic()
    actual.partition() shouldEqual expected.partition()
    actual.timestamp() shouldEqual expected.timestamp()
    actual.key() shouldBe expected.key()
    actual.value() shouldBe expected.value()

  }

} 
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:55,代码来源:KafkaAdapterTest.scala


示例19: Main

//设置package包名称以及导入依赖的类
package connector

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._

object Main {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    val kafkaConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    val kafkaSubscription = Subscriptions.topics("input")
    val maxItemsInBatch = 10

    Consumer.plainSource(kafkaConsumerSettings, kafkaSubscription)
      .groupedWithin(maxItemsInBatch, 10000.milliseconds)
      .runForeach(batch => persist(batch))
  }

//  TODO: Sent batch to ElasticSearch
  def persist(batch: Seq[ConsumerRecord[Array[Byte], String]]): Unit = {
    batch foreach println
  }
} 
开发者ID:jozi-k,项目名称:kafka-to-es-akka,代码行数:36,代码来源:Main.scala


示例20: LateKafka

//设置package包名称以及导入依赖的类
package co.blocke
package latekafka

import akka.stream.scaladsl.Source
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._

case class LateKafka[V](
    host:         String,
    groupId:      String,
    topic:        String,
    deserializer: Deserializer[V],
    properties:   Map[String, String] = Map.empty[String, String]
) extends Iterator[ConsumerRecord[Array[Byte], V]] {

  type REC = ConsumerRecord[Array[Byte], V]
  type ITER_REC = Iterator[REC]

  private val t = KafkaThread[V](host, groupId, topic, deserializer, properties)
  private val h = Heartbeat(t, 100L)
  private var i: ITER_REC = null
  private var hasMore = true

  new java.lang.Thread(t).start
  new java.lang.Thread(h).start
  Thread.sleep(500)

  def done() = hasMore = false
  def stop() = {
    done()
    Thread.sleep(500)
    t.stop()
    h.stop()
  }

  def hasNext = hasMore
  def next() = {
    while (i.isEmpty || !i.hasNext)
      fill()
    i.next
  }

  def commit(cr: REC) = t ! cr
  def source = {
    fill()
    Source.fromIterator(() => this)
  }

  private def fill() = {
    val p = Promise[ITER_REC]()
    val f = p.future
    t ! p
    i = Await.result(f, Duration.Inf).asInstanceOf[ITER_REC]
  }
} 
开发者ID:gzoller,项目名称:LateKafka,代码行数:58,代码来源:LateKafka.scala



注:本文中的org.apache.kafka.clients.consumer.ConsumerRecord类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Serialization类代码示例发布时间:2022-05-23
下一篇:
Scala UserGroupInformation类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap