本文整理汇总了Scala中akka.kafka.Subscriptions类的典型用法代码示例。如果您正苦于以下问题:Scala Subscriptions类的具体用法?Scala Subscriptions怎么用?Scala Subscriptions使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Subscriptions类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: Settings
//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.Future
object Settings {
def consumerSettings(implicit system: ActorSystem) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommittableSourceConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def producerSettings(implicit system: ActorSystem) =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
}
object CommittableSource extends App {
type KafkaMessage = CommittableMessage[Array[Byte], String]
implicit val system = ActorSystem("CommittableSourceConsumerMain")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// explicit commit
Consumer
.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
BusinessController.handleMessage(msg.record.value)
.flatMap(response => msg.committableOffset.commitScaladsl())
.recoverWith { case e => msg.committableOffset.commitScaladsl() }
}
.runWith(Sink.ignore)
}
object BusinessController {
type Service[A, B] = A => Future[B]
val handleMessage: Service[String, String] =
(message) => Future.successful(message.toUpperCase)
}
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala
示例3: PacketConsumer
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.consumer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.kafka.producer.KafkaKey
import edu.uw.at.iroberts.wirefugue.kafka.serdes.{PacketDeserializer, PacketSerde}
import edu.uw.at.iroberts.wirefugue.pcap.Packet
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.IntegerDeserializer
import scala.concurrent.Await
import scala.concurrent.duration._
object PacketConsumer extends App {
type PacketRecord = ConsumerRecord[KafkaKey, Array[Byte]]
val config = ConfigFactory.load("application.conf")
implicit val system = ActorSystem("stream-consumer-system", config)
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new IntegerDeserializer, new PacketDeserializer)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Separate streams for each partition
val maxPartitions = 100
val consumerGroup = Consumer.plainPartitionedSource(consumerSettings, Subscriptions.topics("packets"))
val done = consumerGroup.map {
case (topicPartition, source) =>
val p: Int = topicPartition.partition
source
.map { (cr: ConsumerRecord[Integer, Packet]) => cr.value() }
.filter(_.ip.isDefined)
.toMat(Sink.foreach(packet => println(s"[$p] $packet")))(Keep.both)
.run()
}
.mapAsyncUnordered(maxPartitions)(_._2)
.runWith(Sink.ignore)
Await.result(done, Duration.Inf)
system.terminate()
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:50,代码来源:PacketConsumer.scala
示例4: StreamConsumer
//设置package包名称以及导入依赖的类
package consumers
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import cats.data.Xor
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import io.circe._
import io.circe.generic.auto._
import cats.data.Xor.{Left, Right}
import model.Employee
import scala.concurrent.Future
object StreamConsumer extends App{
implicit val actorSystem = ActorSystem("consumer-actors", ConfigFactory.load())
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))
lazy val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group13")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")//"latest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
lazy val subscription = Subscriptions.topics("raw-data-1")
lazy val db = new Processor()
Consumer.plainSource(consumerSettings, subscription)
.mapAsync(4){
db.processMessage
}
.runWith(Sink.ignore)
}
class Processor {
def processMessage(record: ConsumerRecord[Array[Byte], String]): Future[Done] ={
println(s"DB.save: ${record.value()}")
Option(record.value()).foreach{ jsonString =>
val mayBeEmp: Xor[Error, Employee] = jawn.decode[Employee](jsonString)
mayBeEmp match {
case Left(error) => println(error)
case Right(emp) => println(s"employee name: ${emp.name}")
}
}
Future.successful(Done) }
}
开发者ID:ajit-scala,项目名称:kafka-consumers,代码行数:55,代码来源:StreamConsumer.scala
示例5: TopicHandler
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement
case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])
object TopicHandler {
def create(topicName: String, system: ActorSystem): TopicHandler = {
val uuid = java.util.UUID.randomUUID.toString
val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
.withGroupId(uuid)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")
val partition = 0
val topicSource = createSource(consumerSettings, topicName, partition)
val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)
new TopicHandler(topicName, numberOfMessages, topicSource)
}
def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {
val subscription = Subscriptions.assignmentWithOffset(
new TopicPartition(topicName, partition) -> 0L
)
Consumer.plainSource(consumerSettings, subscription)
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala
示例6: Main
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.Future
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem.apply("akka-stream-kafka")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092;localhost:9093")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1)(msg => {
msg.committableOffset.commitScaladsl
Future.successful(msg)
})
.runForeach(msg => println(s"partition: ${msg.record.partition}; value: ${msg.record.value}"))
}
}
开发者ID:kczulko,项目名称:akka-streams-kafka,代码行数:28,代码来源:Main.scala
示例7: ReactiveKafkaSingleConsumerMultipleProducerScala
//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }
class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {
implicit val system = ActorSystem("reactivekafkascala")
implicit val mat = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9093")
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))
def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)
val producerFlow1 =
Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow2 =
Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow3 =
Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
kafkaSource
.via(producerFlow1)
.via(producerFlow2)
.via(producerFlow3)
.batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
batch.updated(elem.committableOffset)
}.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
}
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala
示例8: CommitConsumerToFlowProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object CommitConsumerToFlowProducerMain extends App {
implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
implicit val materializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommitConsumerToFlowProducer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.via(Producer.flow(producerSettings))
.mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
.runWith(Sink.ignore)
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala
示例9: ConsumerToCommitableSinkProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object ConsumerToCommitableSinkProducerMain extends App {
implicit val system = ActorSystem("Consumer2ProducerMain")
implicit val materializer = ActorMaterializer()
//TODO: move to configuration application.conf
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("Consumer2Producer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
//TODO: move to configuration application.conf
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.runWith(Producer.commitableSink(producerSettings))
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala
示例10: produceRecord
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import scala.concurrent.Future
trait KafkaTest {
def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
Source.single(new ProducerRecord("mail.command.send", key, value))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
.run()
}
def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
.withGroupId(UUID.randomUUID.toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.filter(pf.isDefinedAt)
.map(pf)
.toMat(Sink.head)(Keep.right)
.run()
}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala
示例11: Config
//设置package包名称以及导入依赖的类
package com.github.kliewkliew.cornucopia.kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Producer, Consumer => ConsumerDSL}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
object Config {
object Cornucopia {
private val config = ConfigFactory.load().getConfig("cornucopia")
val minReshardWait = config.getInt("reshard.interval").seconds
val gracePeriod = config.getInt("grace.period") * 1000
val refreshTimeout = config.getInt("refresh.timeout") * 1000
val batchPeriod = config.getInt("batch.period").seconds
}
object Consumer {
private val kafkaConfig = ConfigFactory.load().getConfig("kafka")
private val kafkaServers = kafkaConfig.getString("bootstrap.servers")
private val kafkaConsumerConfig = kafkaConfig.getConfig("consumer")
private val topic = kafkaConsumerConfig.getString("topic")
private val groupId = kafkaConsumerConfig.getString("group.id")
implicit val actorSystem = ActorSystem()
// Log failures and resume processing
private val decider: Supervision.Decider = { e =>
LoggerFactory.getLogger(this.getClass).error("Failed to process event", e)
Supervision.Resume
}
private val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
private val sourceSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaServers)
.withGroupId(groupId)
private val subscription = Subscriptions.topics(topic)
private val sinkSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers(kafkaServers)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
val cornucopiaSource = ConsumerDSL.plainSource(sourceSettings, subscription)
val cornucopiaSink = Producer.plainSink(sinkSettings)
}
}
开发者ID:kliewkliew,项目名称:cornucopia,代码行数:51,代码来源:Config.scala
示例12: ServiceKafkaConsumer
//设置package包名称以及导入依赖的类
package services
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import play.api.Configuration
import scala.concurrent.Future
class ServiceKafkaConsumer(topicNames: Set[String], groupName: String, implicit val mat: Materializer,
actorSystem: ActorSystem, configuration: Configuration, handleEvent: String => Unit) {
val config = configuration.getConfig("kafka")
.getOrElse(throw new Exception("No config element for kafka!"))
.underlying
val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(config.getString("bootstrap.servers"))
.withGroupId(groupName)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getString("auto.offset.reset"))
Consumer.committableSource(consumerSettings,
Subscriptions.topics(topicNames)).mapAsync(1) {
msg=>
val event = msg.record.value()
handleEvent(event)
Future.successful(msg)
}.mapAsync(1) { msg =>
msg.committableOffset.commitScaladsl()
}.runWith(Sink.ignore)
}
开发者ID:getArtemUsername,项目名称:play-and-events,代码行数:37,代码来源:ServiceKafkaConsumer.scala
示例13: KafkaSource
//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.source
import akka.kafka.ConsumerMessage.{ CommittableOffset, CommittableOffsetBatch }
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ AutoSubscription, Subscriptions }
import akka.stream.scaladsl.{ Flow, Source }
import akka.{ Done, NotUsed }
import com.thenetcircle.event_dispatcher.driver.adapter.KafkaSourceAdapter
import com.thenetcircle.event_dispatcher.driver.extractor.Extractor
import com.thenetcircle.event_dispatcher.{ Event, EventFmt }
object KafkaSource {
def atLeastOnce[Fmt <: EventFmt](
settings: KafkaSourceSettings
)(implicit extractor: Extractor[Fmt]): Source[Event, Consumer.Control] = {
val consumerName = settings.name
val consumerSettings = settings.consumerSettings
var subscription: AutoSubscription = if (settings.topics.isDefined) {
Subscriptions.topics(settings.topics.get)
} else if (settings.topicPattern.isDefined) {
Subscriptions.topicPattern(settings.topicPattern.get)
} else {
throw new IllegalArgumentException("Kafka source need subscribe topics")
}
Consumer
.committableSource(consumerSettings, subscription)
.map(msg => {
KafkaSourceAdapter.fit(msg.record).addContext("committableOffset", msg.committableOffset)
})
.map(extractor.extract)
}
def commit(parallelism: Int = 3, batchMax: Int = 20): Flow[Event, Done, NotUsed] =
Flow[Event]
.map(_.rawEvent.context("committableOffset").asInstanceOf[CommittableOffset])
.batch(max = batchMax, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(parallelism)(_.commitScaladsl())
}
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:45,代码来源:KafkaSource.scala
示例14: Main
//设置package包名称以及导入依赖的类
package connector
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val kafkaConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val kafkaSubscription = Subscriptions.topics("input")
val maxItemsInBatch = 10
Consumer.plainSource(kafkaConsumerSettings, kafkaSubscription)
.groupedWithin(maxItemsInBatch, 10000.milliseconds)
.runForeach(batch => persist(batch))
}
// TODO: Sent batch to ElasticSearch
def persist(batch: Seq[ConsumerRecord[Array[Byte], String]]): Unit = {
batch foreach println
}
}
开发者ID:jozi-k,项目名称:kafka-to-es-akka,代码行数:36,代码来源:Main.scala
示例15: KafkaConsumerExample
//设置package包名称以及导入依赖的类
package com.benencahill.akka.streams
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
import scala.util.Success
object KafkaConsumerExample extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val context = system.dispatcher
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("testing-akka-streams")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC, "earliest")
val kafkaSource = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("testing-akka-streams"))
.map( record => record.value())
val deserialize = Flow[String].mapAsync(1){ data =>
import UserEventJsonProtocol._
Unmarshal(data.mkString).to[UserEvent]
}
val print = Sink.foreach[UserEvent](event => println(event))
val blueprint: RunnableGraph[Future[Done]] = kafkaSource.take(100).via(deserialize).toMat(print)(Keep.right)
val materialzied = blueprint.run()
materialzied.andThen { case Success(_) => system.terminate() }
}
开发者ID:benen,项目名称:akka-streams-intro,代码行数:44,代码来源:KafkaConsumerExample.scala
示例16: throttolableConsumerFlow
//设置package包名称以及导入依赖的类
package com.github.everpeace
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.duration.Duration
package object reactive_kafka {
def throttolableConsumerFlow(implicit system: ActorSystem, mat: ActorMaterializer): Source[Done, Consumer.Control] = {
val c = system.settings.config.getConfig("throttolable-consumer")
implicit val ec = system.dispatcher
val bootstrapServers = c getString "bootstrap-servers"
val topic = c getString "topic"
val autoRestConfig = c getString "auto-offset-reset"
val groupId = c getString "group-id"
val throttle = c getInt "throttle"
val throttlePer = Duration.fromNanos((c getDuration "throttle-per").toNanos)
val throttleBurst = c getInt "throttle-burst"
val logPer = c getInt "log-per"
val offsetCommitBatchSize = c getInt "offset-commit-batch-size"
val offsetCommitParallelism = c getInt "offset-commit-parallelism"
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoRestConfig)
val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(topic)))
val throttled = if (throttle > 0)
source.throttle(throttle, throttlePer, throttleBurst, ThrottleMode.shaping)
else source
throttled.statefulMapConcat(() => {
var counter = 0
msg => {
if (counter % logPer == 0) {
system.log.info(s"FakeConsumer consume: $msg")
counter = 0
}
counter += 1
msg :: Nil
}
}).batch(max = offsetCommitBatchSize, m => CommittableOffsetBatch.empty.updated(m.committableOffset))((batch, m) => batch.updated(m.committableOffset))
.mapAsync(offsetCommitParallelism) { batch =>
batch.commitScaladsl()
}
}
}
开发者ID:everpeace,项目名称:throttolable-perf-consumer,代码行数:60,代码来源:package.scala
示例17: RequestUriToGeoCoordinateTransformer
//设置package包名称以及导入依赖的类
package services
import akka.actor._
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
object RequestUriToGeoCoordinateTransformer extends App {
implicit val system = ActorSystem("Request-Processor")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("RequestPathConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
Consumer.plainSource(consumerSettings, Subscriptions.topics("requestpathdata"))
.map(r => GeoCoordinatesService.enrich(r.value()))
.collect { case Some(geo) => geo }
.map(convertToRecord)
.runWith(Producer.plainSink(producerSettings))
def convertToRecord(geo: GeoCoordinate): ProducerRecord[String, String] = {
new ProducerRecord[String, String]("geocoordinatedata", geo.serializeToString())
}
}
开发者ID:aerohit,项目名称:TravellingWhere,代码行数:33,代码来源:RequestUriToGeoCoordinateTransformer.scala
示例18: StartConsuming
//设置package包名称以及导入依赖的类
package services
import actors.SubscribableActor
import akka.Done
import akka.actor.Actor
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
case object StartConsuming
case object RegisterListener
case object UnRegisterListener
class GeoCoordinatesKafkaConsumer(implicit val materializer: ActorMaterializer)
extends Actor with SubscribableActor[GeoCoordinate] {
var hasStartedConsuming = false
override def receive = {
case StartConsuming if !hasStartedConsuming =>
hasStartedConsuming = true
startConsumptionTask()
case RegisterListener =>
subscribe(sender())
case UnRegisterListener =>
unsubscribe(sender())
}
private def startConsumptionTask(): Future[Done] = {
Consumer.plainSource(consumerSettings, Subscriptions.topics("geocoordinatedata"))
.map(r => GeoCoordinate.parseFromString(r.value()))
.collect { case Some(geo) => geo }
.runForeach(notifySubscribers)
}
private val consumerSettings =
ConsumerSettings(context.system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("GeocoordinateConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
开发者ID:aerohit,项目名称:TravellingWhere,代码行数:47,代码来源:GeoCoordinatesKafkaConsumer.scala
示例19: terminateWhenDone
//设置package包名称以及导入依赖的类
package org.biosphere.labs.akka.learning.kafka
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
// https://github.com/akka/reactive-kafka/blob/master/docs/src/test/scala/sample/scaladsl/ConsumerExample.scala
// http://doc.akka.io/docs/akka-stream-kafka/current/consumer.html
// https://github.com/jvwilge/akka-stream-kafka-getting-started/blob/master/src/main/scala/net/jvw/Main.scala
trait ConsumerQuickStart {
implicit val system = ActorSystem.create("ConsumerQuickStart")
implicit val executor: ExecutionContext = system.dispatcher
implicit val mat = ActorMaterializer()
val maxPartitions = 100
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("172.17.0.2:9092")
.withGroupId("ConsumerQuickStartGroup")
.withClientId("ConsumerQuickStartClient")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def terminateWhenDone(result: Future[Done]): Unit = {
result.onComplete {
case Failure(e) =>
system.log.error(e, e.getMessage)
system.terminate()
case Success(_) => system.terminate()
}
}
}
object AtLeastOnceQuickStart extends ConsumerQuickStart {
def main(args: Array[String]): Unit = {
val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map(msg => {
println(msg)
})
.runWith(Sink.ignore)
terminateWhenDone(done)
}
}
开发者ID:fernandohackbart,项目名称:akka-learning,代码行数:56,代码来源:ConsumerQuickStart.scala
示例20: AnalyzerRunner
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.analyzer
import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{Subscriptions, ConsumerSettings, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import io.scalac.newspaper.events._
object AnalyzerRunner extends App {
implicit val system = ActorSystem("Newspaper-Analyzer-System")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ContentFetchedDeserializer)
.withGroupId("Newspaper-Analyzer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySe
|
请发表评论