• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Consumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.kafka.scaladsl.Consumer的典型用法代码示例。如果您正苦于以下问题:Scala Consumer类的具体用法?Scala Consumer怎么用?Scala Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Consumer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: Pusher

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.pusher

import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import reactivehub.akka.stream.apns.Environment._
import reactivehub.akka.stream.apns.TlsUtil._
import reactivehub.akka.stream.apns._
import reactivehub.akka.stream.apns.marshallers.SprayJsonSupport

object Pusher extends SprayJsonSupport {
  val kafka = "192.168.99.100:9092"
  val clientId = "pusher1"
  val consumerGroup = "pusher"
  val topics = Set("notifications")

  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  def main(args: Array[String]): Unit = {
    val group = new NioEventLoopGroup()
    val apns = ApnsExt(system).connection[Long](Development, sslContext, group)

    Consumer.atMostOnceSource(consumerSettings)
      .map(msg => msg.key -> toNotification(msg.value))
      .filter(_._2.deviceToken.bytes.length < 100)
      .viaMat(apns)(Keep.right)
      .log("pusher", _.toString())
      .to(Sink.ignore).run()
      .onComplete { _ =>
        group.shutdownGracefully()
        system.terminate()
      }
  }

  private def sslContext: SslContext =
    loadPkcs12FromResource("/cert.p12", "password")

  private def consumerSettings: ConsumerSettings[Long, PushData] =
    ConsumerSettings(system, ScalaLongDeserializer, PushDataDeserializer, topics)
      .withBootstrapServers(kafka)
      .withClientId(clientId)
      .withGroupId(consumerGroup)
      .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")

  private def toNotification(pushData: PushData): Notification = {
    var builder = Payload.Builder()
    pushData.alert.foreach(alert => builder = builder.withAlert(alert))
    pushData.badge.foreach(badge => builder = builder.withBadge(badge))
    Notification(DeviceToken(pushData.token), builder.result)
  }
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:60,代码来源:Pusher.scala


示例3: Settings

//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.Future

object Settings {
  def consumerSettings(implicit system: ActorSystem) =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommittableSourceConsumer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def producerSettings(implicit system: ActorSystem) =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
}

object CommittableSource extends App {

  type KafkaMessage = CommittableMessage[Array[Byte], String]

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()

  implicit val ec = system.dispatcher

  // explicit commit
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      BusinessController.handleMessage(msg.record.value)
        .flatMap(response => msg.committableOffset.commitScaladsl())
        .recoverWith { case e => msg.committableOffset.commitScaladsl() }
    }
    .runWith(Sink.ignore)

}

object BusinessController {

  type Service[A, B] = A => Future[B]

  val handleMessage: Service[String, String] =
    (message) => Future.successful(message.toUpperCase)

} 
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala


示例4: PacketConsumer

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.kafka.producer.KafkaKey
import edu.uw.at.iroberts.wirefugue.kafka.serdes.{PacketDeserializer, PacketSerde}
import edu.uw.at.iroberts.wirefugue.pcap.Packet
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.IntegerDeserializer

import scala.concurrent.Await
import scala.concurrent.duration._


object PacketConsumer extends App {
  type PacketRecord = ConsumerRecord[KafkaKey, Array[Byte]]
  val config = ConfigFactory.load("application.conf")

  implicit val system = ActorSystem("stream-consumer-system", config)
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new IntegerDeserializer, new PacketDeserializer)
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  // Separate streams for each partition
  val maxPartitions = 100
  val consumerGroup = Consumer.plainPartitionedSource(consumerSettings, Subscriptions.topics("packets"))

  val done = consumerGroup.map {
    case (topicPartition, source) =>
      val p: Int = topicPartition.partition
      source
        .map { (cr: ConsumerRecord[Integer, Packet]) => cr.value() }
        .filter(_.ip.isDefined)
        .toMat(Sink.foreach(packet => println(s"[$p] $packet")))(Keep.both)
        .run()
  }
    .mapAsyncUnordered(maxPartitions)(_._2)
    .runWith(Sink.ignore)

  Await.result(done, Duration.Inf)

  system.terminate()
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:50,代码来源:PacketConsumer.scala


示例5: StreamConsumer

//设置package包名称以及导入依赖的类
package consumers

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import cats.data.Xor
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import io.circe._
import io.circe.generic.auto._
import cats.data.Xor.{Left, Right}
import model.Employee

import scala.concurrent.Future

object StreamConsumer  extends App{
  implicit val actorSystem = ActorSystem("consumer-actors", ConfigFactory.load())
  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))

  lazy val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group13")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")//"latest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

  lazy val subscription = Subscriptions.topics("raw-data-1")
  lazy val db = new Processor()
  Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(4){
        db.processMessage
      }
    .runWith(Sink.ignore)

}

class Processor {
  def processMessage(record: ConsumerRecord[Array[Byte], String]): Future[Done] ={
    println(s"DB.save: ${record.value()}")

    Option(record.value()).foreach{ jsonString =>
      val mayBeEmp: Xor[Error, Employee] = jawn.decode[Employee](jsonString)
      mayBeEmp match {
        case Left(error) => println(error)
        case Right(emp) => println(s"employee name: ${emp.name}")
      }
    }
    Future.successful(Done)  }
} 
开发者ID:ajit-scala,项目名称:kafka-consumers,代码行数:55,代码来源:StreamConsumer.scala


示例6: TopicHandler

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement

case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])

object TopicHandler {

  def create(topicName: String, system: ActorSystem): TopicHandler = {

    val uuid = java.util.UUID.randomUUID.toString
    val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
      .withGroupId(uuid)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")

    val partition = 0
    val topicSource = createSource(consumerSettings, topicName, partition)
    val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)

    new TopicHandler(topicName, numberOfMessages, topicSource)
  }

  def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {

    val subscription = Subscriptions.assignmentWithOffset(
      new TopicPartition(topicName, partition) -> 0L
    )
    Consumer.plainSource(consumerSettings, subscription)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala


示例7: Main

//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.Future

object Main {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem.apply("akka-stream-kafka")
    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092;localhost:9093")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1)(msg => {
        msg.committableOffset.commitScaladsl
        Future.successful(msg)
      })
      .runForeach(msg => println(s"partition: ${msg.record.partition}; value: ${msg.record.value}"))
  }
} 
开发者ID:kczulko,项目名称:akka-streams-kafka,代码行数:28,代码来源:Main.scala


示例8: ReactiveKafkaSingleConsumerMultipleProducerScala

//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }

class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {

  implicit val system = ActorSystem("reactivekafkascala")
  implicit val mat = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9093")

  val kafkaSource =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))

  def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
    Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)

  val producerFlow1 =
    Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow2 =
    Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow3 =
    Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  kafkaSource
    .via(producerFlow1)
    .via(producerFlow2)
    .via(producerFlow3)
    .batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
      batch.updated(elem.committableOffset)
    }.mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)

} 
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala


示例9: CommitConsumerToFlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object CommitConsumerToFlowProducerMain extends App {
  implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommitConsumerToFlowProducer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .map { msg =>
        println(s"topic1 -> topic2: $msg")
        ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
          "topic2",
          msg.record.value
        ), msg.committableOffset)
      }
      .via(Producer.flow(producerSettings))
      .mapAsync(producerSettings.parallelism) { result =>
        result.message.passThrough.commitScaladsl()
      }
      .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala


示例10: ConsumerToCommitableSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object ConsumerToCommitableSinkProducerMain extends App {

  implicit val system = ActorSystem("Consumer2ProducerMain")
  implicit val materializer = ActorMaterializer()

  //TODO: move to configuration application.conf
  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("Consumer2Producer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  //TODO: move to configuration application.conf
  val producerSettings =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map { msg =>
      println(s"topic1 -> topic2: $msg")
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic2",
        msg.record.value
      ), msg.committableOffset)
    }
    .runWith(Producer.commitableSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala


示例11: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala


示例12: KafkaDao

//设置package包名称以及导入依赖的类
package dao

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{
  ConsumerSettings,
  ProducerSettings,
  Subscriptions
}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}
import play.api.libs.json.Format
import [email protected]@

class KafkaDao[Event](val topic: String @@ Event)(implicit mat: Materializer,
                      sys: ActorSystem,
                      format: Format[Event])
    extends EventDao[Event] {

  def producerSettings =
    ProducerSettings(sys,
                     new ByteArraySerializer(),
                     new StringSerializer())
      .withBootstrapServers("localhost:9092")

  private def kafkaIn: Sink[String, NotUsed] =
    Flow[String]
      .map { elem =>
        new ProducerRecord[Array[Byte], String](topic,
                                                elem)
      }
      .to(Producer.plainSink(producerSettings))

  private def consumerSettings =
    ConsumerSettings(sys,
                     new ByteArrayDeserializer,
                     new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")

  private def kafkaOut: Source[String, Control] =
    Consumer
      .plainSource(consumerSettings,
                   Subscriptions.topics(topic))
      .map(_.value())

  override protected def eventStore
    : Flow[String, String, NotUsed] =
    Flow.fromSinkAndSource(kafkaIn, kafkaOut)
} 
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:60,代码来源:KafkaDao.scala


示例13: ServiceKafkaConsumer

//设置package包名称以及导入依赖的类
package services

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import play.api.Configuration

import scala.concurrent.Future


class ServiceKafkaConsumer(topicNames: Set[String], groupName: String, implicit val mat: Materializer,
                           actorSystem: ActorSystem, configuration: Configuration, handleEvent: String => Unit) {
  val config = configuration.getConfig("kafka")
    .getOrElse(throw new Exception("No config element for kafka!"))
    .underlying

  val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(config.getString("bootstrap.servers"))
    .withGroupId(groupName)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getString("auto.offset.reset"))
  
  Consumer.committableSource(consumerSettings,
    Subscriptions.topics(topicNames)).mapAsync(1) {
    msg=>
      val event = msg.record.value()
      handleEvent(event)
      Future.successful(msg)
  }.mapAsync(1) { msg =>
    msg.committableOffset.commitScaladsl()
  }.runWith(Sink.ignore)
  
} 
开发者ID:getArtemUsername,项目名称:play-and-events,代码行数:37,代码来源:ServiceKafkaConsumer.scala


示例14: KafkaSource

//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.source

import akka.kafka.ConsumerMessage.{ CommittableOffset, CommittableOffsetBatch }
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ AutoSubscription, Subscriptions }
import akka.stream.scaladsl.{ Flow, Source }
import akka.{ Done, NotUsed }
import com.thenetcircle.event_dispatcher.driver.adapter.KafkaSourceAdapter
import com.thenetcircle.event_dispatcher.driver.extractor.Extractor
import com.thenetcircle.event_dispatcher.{ Event, EventFmt }

object KafkaSource {

  def atLeastOnce[Fmt <: EventFmt](
      settings: KafkaSourceSettings
  )(implicit extractor: Extractor[Fmt]): Source[Event, Consumer.Control] = {
    val consumerName = settings.name
    val consumerSettings = settings.consumerSettings

    var subscription: AutoSubscription = if (settings.topics.isDefined) {
      Subscriptions.topics(settings.topics.get)
    } else if (settings.topicPattern.isDefined) {
      Subscriptions.topicPattern(settings.topicPattern.get)
    } else {
      throw new IllegalArgumentException("Kafka source need subscribe topics")
    }

    Consumer
      .committableSource(consumerSettings, subscription)
      .map(msg => {
        KafkaSourceAdapter.fit(msg.record).addContext("committableOffset", msg.committableOffset)
      })
      .map(extractor.extract)
  }

  def commit(parallelism: Int = 3, batchMax: Int = 20): Flow[Event, Done, NotUsed] =
    Flow[Event]
      .map(_.rawEvent.context("committableOffset").asInstanceOf[CommittableOffset])
      .batch(max = batchMax, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
        batch.updated(elem)
      }
      .mapAsync(parallelism)(_.commitScaladsl())

} 
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:45,代码来源:KafkaSource.scala


示例15: Main

//设置package包名称以及导入依赖的类
package connector

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._

object Main {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    val kafkaConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    val kafkaSubscription = Subscriptions.topics("input")
    val maxItemsInBatch = 10

    Consumer.plainSource(kafkaConsumerSettings, kafkaSubscription)
      .groupedWithin(maxItemsInBatch, 10000.milliseconds)
      .runForeach(batch => persist(batch))
  }

//  TODO: Sent batch to ElasticSearch
  def persist(batch: Seq[ConsumerRecord[Array[Byte], String]]): Unit = {
    batch foreach println
  }
} 
开发者ID:jozi-k,项目名称:kafka-to-es-akka,代码行数:36,代码来源:Main.scala


示例16: KafkaConsumerExample

//设置package包名称以及导入依赖的类
package com.benencahill.akka.streams

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future
import scala.util.Success

object KafkaConsumerExample extends App {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val context = system.dispatcher

  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("testing-akka-streams")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC, "earliest")

  val kafkaSource = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("testing-akka-streams"))
    .map( record => record.value())

  val deserialize = Flow[String].mapAsync(1){ data =>
    import UserEventJsonProtocol._
    Unmarshal(data.mkString).to[UserEvent]
  }

  val print = Sink.foreach[UserEvent](event => println(event))

  val blueprint: RunnableGraph[Future[Done]] = kafkaSource.take(100).via(deserialize).toMat(print)(Keep.right)

  val materialzied = blueprint.run()

  materialzied.andThen { case Success(_) => system.terminate() }
} 
开发者ID:benen,项目名称:akka-streams-intro,代码行数:44,代码来源:KafkaConsumerExample.scala


示例17: throttolableConsumerFlow

//设置package包名称以及导入依赖的类
package com.github.everpeace

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration.Duration

package object reactive_kafka {

  def throttolableConsumerFlow(implicit system: ActorSystem, mat: ActorMaterializer): Source[Done, Consumer.Control] = {
    val c = system.settings.config.getConfig("throttolable-consumer")
    implicit val ec = system.dispatcher

    val bootstrapServers = c getString "bootstrap-servers"
    val topic = c getString "topic"
    val autoRestConfig = c getString "auto-offset-reset"
    val groupId = c getString "group-id"
    val throttle = c getInt "throttle"
    val throttlePer = Duration.fromNanos((c getDuration "throttle-per").toNanos)
    val throttleBurst = c getInt "throttle-burst"
    val logPer = c getInt "log-per"
    val offsetCommitBatchSize = c getInt "offset-commit-batch-size"
    val offsetCommitParallelism = c getInt "offset-commit-parallelism"

    val consumerSettings =
      ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
        .withBootstrapServers(bootstrapServers)
        .withGroupId(groupId)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoRestConfig)

    val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(topic)))

    val throttled = if (throttle > 0)
      source.throttle(throttle, throttlePer, throttleBurst, ThrottleMode.shaping)
    else source

    throttled.statefulMapConcat(() => {
      var counter = 0
      msg => {
        if (counter % logPer == 0) {
          system.log.info(s"FakeConsumer consume: $msg")
          counter = 0
        }
        counter += 1
        msg :: Nil
      }
    }).batch(max = offsetCommitBatchSize, m => CommittableOffsetBatch.empty.updated(m.committableOffset))((batch, m) => batch.updated(m.committableOffset))
      .mapAsync(offsetCommitParallelism) { batch =>
      batch.commitScaladsl()
    }
  }
} 
开发者ID:everpeace,项目名称:throttolable-perf-consumer,代码行数:60,代码来源:package.scala


示例18: RequestUriToGeoCoordinateTransformer

//设置package包名称以及导入依赖的类
package services

import akka.actor._
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

object RequestUriToGeoCoordinateTransformer extends App {
  implicit val system = ActorSystem("Request-Processor")
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("RequestPathConsumer")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  Consumer.plainSource(consumerSettings, Subscriptions.topics("requestpathdata"))
    .map(r => GeoCoordinatesService.enrich(r.value()))
    .collect { case Some(geo) => geo }
    .map(convertToRecord)
    .runWith(Producer.plainSink(producerSettings))

  def convertToRecord(geo: GeoCoordinate): ProducerRecord[String, String] = {
    new ProducerRecord[String, String]("geocoordinatedata", geo.serializeToString())
  }
} 
开发者ID:aerohit,项目名称:TravellingWhere,代码行数:33,代码来源:RequestUriToGeoCoordinateTransformer.scala


示例19: StartConsuming

//设置package包名称以及导入依赖的类
package services

import actors.SubscribableActor
import akka.Done
import akka.actor.Actor
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

case object StartConsuming

case object RegisterListener

case object UnRegisterListener

class GeoCoordinatesKafkaConsumer(implicit val materializer: ActorMaterializer)
  extends Actor with SubscribableActor[GeoCoordinate] {
  var hasStartedConsuming = false

  override def receive = {
    case StartConsuming if !hasStartedConsuming =>
      hasStartedConsuming = true
      startConsumptionTask()
    case RegisterListener =>
      subscribe(sender())
    case UnRegisterListener =>
      unsubscribe(sender())
  }

  private def startConsumptionTask(): Future[Done] = {
    Consumer.plainSource(consumerSettings, Subscriptions.topics("geocoordinatedata"))
      .map(r => GeoCoordinate.parseFromString(r.value()))
      .collect { case Some(geo) => geo }
      .runForeach(notifySubscribers)
  }

  private val consumerSettings =
    ConsumerSettings(context.system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("GeocoordinateConsumer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
} 
开发者ID:aerohit,项目名称:TravellingWhere,代码行数:47,代码来源:GeoCoordinatesKafkaConsumer.scala


示例20: ProducerConsumerTest

//设置package包名称以及导入依赖的类
package com.github.dnvriend.kafka

import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.scaladsl.Producer.Message
import akka.stream.scaladsl.Source
import com.github.dnvriend.TestSpec
import org.apache.kafka.clients.producer.ProducerRecord

class ProducerConsumerTest extends TestSpec {

  it should "produce and consume messages" in {
    Source(1 to 10000)
      .map(_.toString)
      .map(elem => Message(new ProducerRecord[Array[Byte], String]("topic1", elem), elem))
      .via(Producer.flow(producerSettings))
      .map { result =>
        val record = result.message.record
        println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value} (${result.message.passThrough})")
        result
      }
      .runForeach(_ => ()).toTry should be a 'success

    Consumer.plainSource(consumerSettings.withClientId("client1"))
      .take(1000)
      .runForeach(println).toTry should be a 'success
  }

} 
开发者ID:dnvriend,项目名称:reactive-kafka-test,代码行数:29,代码来源:ProducerConsumerTest.scala



注:本文中的akka.kafka.scaladsl.Consumer类示例整理自Github/MSDoc


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ConsumerSettings类代码示例发布时间:2022-05-23
下一篇:
Scala ServletContainer类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap