本文整理汇总了Scala中org.apache.kafka.common.serialization.ByteArraySerializer类的典型用法代码示例。如果您正苦于以下问题:Scala ByteArraySerializer类的具体用法?Scala ByteArraySerializer怎么用?Scala ByteArraySerializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ByteArraySerializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: Settings
//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.Future
object Settings {
def consumerSettings(implicit system: ActorSystem) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommittableSourceConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def producerSettings(implicit system: ActorSystem) =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
}
object CommittableSource extends App {
type KafkaMessage = CommittableMessage[Array[Byte], String]
implicit val system = ActorSystem("CommittableSourceConsumerMain")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// explicit commit
Consumer
.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
BusinessController.handleMessage(msg.record.value)
.flatMap(response => msg.committableOffset.commitScaladsl())
.recoverWith { case e => msg.committableOffset.commitScaladsl() }
}
.runWith(Sink.ignore)
}
object BusinessController {
type Service[A, B] = A => Future[B]
val handleMessage: Service[String, String] =
(message) => Future.successful(message.toUpperCase)
}
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala
示例3: running
//设置package包名称以及导入依赖的类
package producers
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.Future
trait Producerable extends ActorBroker {
val config: AppConfig
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")
def running(): Receive = {
case Stop =>
log.info("Stopping Kafka producer stream and actor")
context.stop(self)
}
def sendToSink(message: String): Unit = {
log.info(s"Attempting to produce message on topic $topicName")
val kafkaSink = Producer.plainSink(producerSettings)
val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
.toMat(kafkaSink)(Keep.both)
.run()
future.onFailure {
case ex =>
log.error("Stream failed due to error, restarting", ex)
throw ex
}
context.become(running())
log.info(s"Writer now running, writing random numbers to topic $topicName")
}
case object Stop
}
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala
示例4: self
//设置package包名称以及导入依赖的类
package com.omearac.producers
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
trait ProducerStream extends AkkaStreams with EventSourcing {
implicit val system: ActorSystem
def self: ActorRef
def createStreamSource[msgType] = {
Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
}
def createStreamSink(producerProperties: Map[String, String]) = {
val kafkaMBAddress = producerProperties("bootstrap-servers")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)
Producer.plainSink(producerSettings)
}
def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
val numberOfPartitions = producerProperties("num.partitions").toInt -1
val topicToPublish = producerProperties("publish-topic")
val rand = new scala.util.Random
val range = 0 to numberOfPartitions
Flow[msgType].map { msg =>
val partition = range(rand.nextInt(range.length))
val stringJSONMessage = Conversion[msgType].convertToJson(msg)
new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
}
}
}
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala
示例5: ReactiveKafkaSingleConsumerMultipleProducerScala
//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }
class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {
implicit val system = ActorSystem("reactivekafkascala")
implicit val mat = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9093")
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))
def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)
val producerFlow1 =
Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow2 =
Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow3 =
Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
kafkaSource
.via(producerFlow1)
.via(producerFlow2)
.via(producerFlow3)
.batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
batch.updated(elem.committableOffset)
}.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
}
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala
示例6: ReactiveProducer
//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka
import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }
object ReactiveProducer {
val system = ActorSystem("example")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer.create(system)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
def produce(msg: String): Unit = {
val done = Source(1 to 1)
.map(_.toString)
.map { elem =>
println("\n" + msg);
new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
}
.runWith(Producer.plainSink(producerSettings, kafkaProducer))
// #plainSinkWithProducer
// terminateWhenDone(done)
}
def terminateWhenDone(result: Future[Done]): Unit = {
result.onComplete {
case Failure(e) =>
system.log.error(e, e.getMessage)
system.terminate()
case Success(_) => system.terminate()
}
}
}
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala
示例7: FlowProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
object FlowProducerMain extends App {
implicit val system = ActorSystem("FlowProducerMain")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done = Source(1 to 10)
.map { n =>
// val partition = math.abs(n) % 2
val partition = 0
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic1", partition, null, n.toString
), n)
}
.via(Producer.flow(producerSettings))
.map { result =>
val record = result.message.record
println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
s"(${result.message.passThrough})")
result
}
.runWith(Sink.ignore)
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:36,代码来源:FlowProducerMain.scala
示例8: CommitConsumerToFlowProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object CommitConsumerToFlowProducerMain extends App {
implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
implicit val materializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommitConsumerToFlowProducer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.via(Producer.flow(producerSettings))
.mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
.runWith(Sink.ignore)
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala
示例9: PlainSinkProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
object PlainSinkProducerMain extends App {
implicit val system = ActorSystem("PlainSinkProducerMain")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done = Source(1 to 10)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:29,代码来源:PlainSinkProducerMain.scala
示例10: ConsumerToCommitableSinkProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object ConsumerToCommitableSinkProducerMain extends App {
implicit val system = ActorSystem("Consumer2ProducerMain")
implicit val materializer = ActorMaterializer()
//TODO: move to configuration application.conf
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("Consumer2Producer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
//TODO: move to configuration application.conf
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.runWith(Producer.commitableSink(producerSettings))
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala
示例11: BaseStation2Kafka
//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }
import scala.concurrent.Future
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.{ encodeLocalDateDefault, encodeZonedDateTimeDefault }
import com.darienmt.airplaneadventures.basestation.collector.parsing.CirceEncoders._
object BaseStation2Kafka {
case class SourceConfig(address: String, port: Int)
case class SinkConfig(address: String, port: Int, topic: String)
def apply(sourceConfig: SourceConfig, sinkConfig: SinkConfig)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Done] =
BaseStationSource(sourceConfig.address, sourceConfig.port)
.map(_.asJson.noSpaces)
.map(m => new ProducerRecord[Array[Byte], String](sinkConfig.topic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${sinkConfig.address}:${sinkConfig.port}")
)
)
}
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:35,代码来源:BaseStation2Kafka.scala
示例12:
//设置package包名称以及导入依赖的类
import akka.kafka._
import akka.kafka.scaladsl._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers("localhost:9092")
Source(1 to 10000)
.map(_.toString)
.map(elem => new ProducerRecord[Array[Byte], String]("topic1", elem))
.to(Producer.plainSink(producerSettings))
Source(1 to 10000).map(elem => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic1", elem.toString), elem))
.via(Producer.flow(producerSettings))
.map { result =>
val record = result.message.record
println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value} (${result.message.passThrough}")
result
}
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:20,代码来源:ProducerSettings.scala
示例13: KafkaEventProcessor
//设置package包名称以及导入依赖的类
package events
import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import com.google.inject.Inject
import events.KafkaEventProcessor.Init
import events.Serializer.EventSerializer
import models.KafkaEvents.{Event, Hello}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import play.api.{Configuration, Logger}
import scala.concurrent.ExecutionContextExecutor
class KafkaEventProcessor @Inject()(config: Configuration) extends Actor with ActorLogging {
private val eventStream = context.system.eventStream
implicit val materializer: ActorMaterializer = ActorMaterializer()(context.system)
implicit val ctx: ExecutionContextExecutor = context.dispatcher
private val server = config.getString("kafka.broker").getOrElse("localhost:9092")
private val topic = config.getString("kafka.topic").getOrElse("test")
override def preStart(): Unit = {
super.preStart()
self ! Init
log.info("Start EventsProcessorActor")
}
override def postStop(): Unit = {
eventStream.unsubscribe(self)
super.postStop()
}
override def receive = {
case Init => createProducer()
}
private def createProducer() = {
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer(), new EventSerializer())
.withBootstrapServers(server)
val jobManagerSource = Source.actorPublisher[Event](Props(classOf[KafkaEventPublisher]))
Flow[Event].map {
case e: Hello => new ProducerRecord[Array[Byte], Event](topic, e)
}.to(Producer.plainSink(producerSettings))
.runWith(jobManagerSource)
Logger.info("init producer")
}
}
object KafkaEventProcessor {
case object Init
}
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:61,代码来源:KafkaEventProcessor.scala
示例14: Config
//设置package包名称以及导入依赖的类
package com.kissthinker.kafka
import java.util.Properties
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
abstract class Config(kafkaAddress: String) extends Properties {
put("bootstrap.servers", kafkaAddress)
}
class PublisherConfig(kafkaAddress: String) extends Config(kafkaAddress) {
put("metadata.broker.list", kafkaAddress)
//put("client.id", "kafka-publisher")
put("key.serializer", classOf[ByteArraySerializer].getName)
put("value.serializer", classOf[ByteArraySerializer].getName)
put("producer.type", "async")
}
class SubscriberConfig(zookeeperAddress: String, kafkaAddress: String) extends Config(kafkaAddress) {
//put("zookeeper.connect", "127.0.0.1:2181")
put("zookeeper.connect", zookeeperAddress)
//put("group.id", "1")
put("group.id", "2")
put("auto.offset.reset", "largest")
put("zookeeper.session.timeout.ms", "400")
put("zookeeper.sync.time.ms", "200")
put("auto.commit.interval.ms", "1000")
put("key.deserializer", classOf[ByteArrayDeserializer].getName)
put("value.deserializer", classOf[ByteArrayDeserializer].getName)
}
开发者ID:davidainslie,项目名称:kafka-kissthinker,代码行数:30,代码来源:Config.scala
示例15: ThrottolableConsumerFlowSpec
//设置package包名称以及导入依赖的类
package com.github.everpeace.reactive_kafka
import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration._
class ThrottolableConsumerFlowSpec extends FlatSpec with Matchers with EmbeddedKafka with RandomPortSupport {
val kafkaPort = temporaryServerPort()
val zkPort = temporaryServerPort()
val TOPIC = "topic1"
implicit val system = ActorSystem("ThrottolableConsumerFlowSpec", ConfigFactory.parseString(
s"""
|throttolable-consumer {
| bootstrap-servers = "localhost:$kafkaPort"
| topic = "$TOPIC"
| group-id = "throttolable-consumer-flow-spec"
| throttle = 0
| offset-commit-batch-size = 2
| offset-commit-parallelism = 10
|}
""".stripMargin))
implicit val materializer = ActorMaterializer()
implicit val kafkaConfig = EmbeddedKafkaConfig(kafkaPort, zkPort)
implicit val byteArraySer = new ByteArraySerializer
implicit val stringSer = new StringSerializer
def createMsg(n: Int): Seq[(Array[Byte], String)] = {
def gen = {
val key = scala.util.Random.alphanumeric.take(10).mkString.getBytes()
val msg = scala.util.Random.alphanumeric.take(10).mkString
key -> msg
}
Iterator.tabulate(n)(_ => gen).toSeq
}
"FakeConsumerFlow" should "consume messages correctly" in withRunningKafka {
createCustomTopic("topic1")
val p = TestProbe("sinkProbe")
val control = throttolableConsumerFlow.toMat(Sink.actorRef(p.ref, Done))(Keep.left).run()
Thread.sleep((5 second).toMillis)
val n = 100
createMsg(n).foreach(kv => publishToKafka(TOPIC, kv._1, kv._2))
(0 until n).foreach(_ => p.expectMsgType[Done])
Await.result(control.shutdown(), 1 minute)
}
}
开发者ID:everpeace,项目名称:throttolable-perf-consumer,代码行数:62,代码来源:ThrottolableConsumerFlowSpec.scala
示例16: FutureToTry
//设置package包名称以及导入依赖的类
package com.github.dnvriend
import akka.NotUsed
import akka.actor._
import akka.event.{Logging, LoggingAdapter}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
trait TestSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll {
implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = ActorMaterializer()
implicit val log: LoggingAdapter = Logging(system, this.getClass)
implicit val pc: PatienceConfig = PatienceConfig(timeout = 50.seconds)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("boot2docker:9092")
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer,
Set("topic1"))
.withBootstrapServers("boot2docker:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit class FutureToTry[T](f: Future[T]) {
def toTry: Try[T] = Try(f.futureValue)
}
def withIterator[T](start: Int = 0)(f: Source[Int, NotUsed] ? T): T =
f(Source.fromIterator(() ? Iterator from start))
override protected def afterAll(): Unit = {
system.terminate()
system.whenTerminated.toTry should be a 'success
}
}
开发者ID:dnvriend,项目名称:reactive-kafka-test,代码行数:49,代码来源:TestSpec.scala
示例17: system
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.crawler.publishing
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import io.scalac.newspaper.crawler.fetching.FetchingFlow.URLFetched
import io.scalac.newspaper.crawler.publishing.KafkaPublisher._
import io.scalac.newspaper.events.ContentFetched
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import scala.concurrent.Future
trait KafkaPublisher extends Publisher {
implicit def system: ActorSystem
def topic: String
private def producerSettings = ProducerSettings(system, new ByteArraySerializer, new ContentFetchedSerializer)
override def publish: Sink[URLFetched, Future[Done]] =
Flow[URLFetched]
.map(new ProducerRecord[Array[Byte], ContentFetched](topic, _))
.toMat(Producer.plainSink[Array[Byte], ContentFetched](producerSettings))(Keep.right)
}
object KafkaPublisher {
implicit def urlFetched2ContentFetched(source: URLFetched): ContentFetched =
ContentFetched(source.url, source.content)
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:33,代码来源:KafkaPublisher.scala
示例18: publishNewSubscription
//设置package包名称以及导入依赖的类
package services
import javax.inject.Singleton
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
import com.google.inject.ImplementedBy
import io.scalac.newspaper.events.SubscribeUser
import model.Subscriber
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer}
import serializers.SubscribeUserPBSerializer
import scala.concurrent.Future
@ImplementedBy(classOf[KafkaOutboundService])
trait OutboundService {
def publishNewSubscription(s: Subscriber): Future[Unit]
}
@Singleton
class KafkaOutboundService() extends OutboundService {
//TODO: move it outside and share among services
implicit val system = ActorSystem("Newspaper-Analyzer-System")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new SubscribeUserPBSerializer)
val source: SourceQueueWithComplete[SubscribeUser] =
Source.queue[SubscribeUser](100, OverflowStrategy.fail).map { sub =>
new ProducerRecord[Array[Byte], SubscribeUser]("newspaper-users", sub)
}
.to(Producer.plainSink(producerSettings))
.run()
override def publishNewSubscription(sub: Subscriber): Future[Unit] = {
val domainObject = SubscribeUser(sub.email, sub.name.getOrElse(""))
source.offer(domainObject).map{ _ =>
()
}
}
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:48,代码来源:OutboundService.scala
示例19: AnalyzerRunner
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.analyzer
import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{Subscriptions, ConsumerSettings, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import io.scalac.newspaper.events._
object AnalyzerRunner extends App {
implicit val system = ActorSystem("Newspaper-Analyzer-System")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ContentFetchedDeserializer)
.withGroupId("Newspaper-Analyzer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ChangeDetectedSerializer)
val subscription = Subscriptions.topics("newspaper-content")
Consumer.committableSource(consumerSettings, subscription)
.map { msg =>
// Do sth with msg.record.value
println(s"[ANALYZING] ${msg.record.value}")
val input = msg.record.value
val output = ChangeDetected(input.pageUrl, input.pageContent)
val record = new ProducerRecord[Array[Byte], ChangeDetected]("newspaper", output)
ProducerMessage.Message(record, msg.committableOffset)
}
.via(Producer.flow(producerSettings))
.map(_.message.passThrough)
.mapAsync(1)(_.commitScaladsl())
.runWith(Sink.ignore)
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:41,代码来源:AnalyzerRunner.scala
示例20: KafkaPublisher
//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
import KafkaPublisher._
implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")
val kafkaConfig = KafkaConfig(system.settings.config)
private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement
private def publisherSettings = {
val keySerializer = new StringSerializer
val valueSerializer = new ByteArraySerializer
ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
}
private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
)
private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")
def publish(topic: String, msg: AnyRef): Future[Done] = {
val completed: Promise[Done] = Promise()
publishActor ! Publish(topic, msg, completed)
completed.future
}
}
object KafkaPublisher {
private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
}
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala
注:本文中的org.apache.kafka.common.serialization.ByteArraySerializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论