本文整理汇总了Scala中akka.kafka.scaladsl.Producer类的典型用法代码示例。如果您正苦于以下问题:Scala Producer类的具体用法?Scala Producer怎么用?Scala Producer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Producer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: KafkaProducer
//设置package包名称以及导入依赖的类
package com.marwanad.actor
import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.marwanad.model.SerializableCommentContent
import org.apache.kafka.clients.producer.ProducerRecord
import com.marwanad.serializer.CommentContentJsonProtocol._
import com.marwanad.util.log.Timber
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import spray.json.JsonWriter
class KafkaProducer(mat: Materializer, topic: String, bootstrap: String) extends Actor with Timber {
implicit lazy val materializer = mat
override def preStart(): Unit = {
logActorPreStart("KafkaWriter")
super.preStart()
self ! "Start"
}
private def toJson[T](element: T)(implicit writer: JsonWriter[T]): String = {
writer.write(element).toString
}
override def receive: Receive = {
case "Start" =>
logActorMessageReceived("KafkaWriter received Start")
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrap)
val ele = Source.actorPublisher[SerializableCommentContent](CommentPublisher.props)
.map(toJson(_))
.map { elem =>
new ProducerRecord[Array[Byte], String](topic, elem)
}
.runWith(Producer.plainSink(producerSettings))
}
}
object KafkaProducer {
def props(mat: Materializer, topic: String, bootstrap: String): Props = Props(new KafkaProducer(mat, topic, bootstrap))
}
开发者ID:marwanad,项目名称:UofR,代码行数:47,代码来源:KafkaProducer.scala
示例3: Main
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.sensor
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Sink}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.pcap.PcapFileRaw.LinkType
import edu.uw.at.iroberts.wirefugue.pcap._
import edu.uw.at.iroberts.wirefugue.kafka.producer.{KafkaKey, PacketProducer}
import edu.uw.at.iroberts.wirefugue.kafka.serdes.PacketSerializer
import edu.uw.at.iroberts.wirefugue.protocol.overlay.{Ethernet, IPV4Datagram}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, IntegerSerializer}
import scala.concurrent.Await
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
println("Please specify a filename as the first argument")
System.exit(1)
}
val config = ConfigFactory.load("application.conf")
implicit val system = ActorSystem("stream-producer-system", config)
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings[Integer, Packet](system, None, None)
val doneF = PcapSource(Paths.get(args(0)).toUri)
.filter( p => p.network == LinkType.ETHERNET && p.ip.isDefined )
.map( packet => new ProducerRecord[Integer, Packet]("packets", packet.key.##, packet))
.map( pr => new ProducerMessage.Message[Integer, Packet, Unit](pr, ()))
.via(Producer.flow(producerSettings))
.runWith(Sink.foreach(println))
try {
Await.ready(doneF, 10 seconds)
}
finally {
system.terminate()
}
}
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:53,代码来源:Main.scala
示例4: running
//设置package包名称以及导入依赖的类
package producers
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.Future
trait Producerable extends ActorBroker {
val config: AppConfig
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")
def running(): Receive = {
case Stop =>
log.info("Stopping Kafka producer stream and actor")
context.stop(self)
}
def sendToSink(message: String): Unit = {
log.info(s"Attempting to produce message on topic $topicName")
val kafkaSink = Producer.plainSink(producerSettings)
val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
.toMat(kafkaSink)(Keep.both)
.run()
future.onFailure {
case ex =>
log.error("Stream failed due to error, restarting", ex)
throw ex
}
context.become(running())
log.info(s"Writer now running, writing random numbers to topic $topicName")
}
case object Stop
}
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala
示例5: Main
//设置package包名称以及导入依赖的类
import java.util.concurrent.TimeUnit.SECONDS
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem.apply("akka-stream-kafka")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092;localhost:9093")
Source.repeat(0)
.scan(0)((next, _) => next + 1)
.throttle(1, FiniteDuration(2L, SECONDS), 1, ThrottleMode.Shaping)
.map(nextInt => {
val topicName = "topic1"
val partitionCount = 2
val partition = nextInt % partitionCount
new ProducerRecord[Array[Byte], String](topicName, nextInt.toString.getBytes, nextInt.toString)
// new ProducerRecord[Array[Byte], String](topicName, partition, null, nextInt.toString)
})
.runWith(Producer.plainSink(producerSettings))
}
}
开发者ID:kczulko,项目名称:akka-streams-kafka,代码行数:37,代码来源:Main.scala
示例6: self
//设置package包名称以及导入依赖的类
package com.omearac.producers
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
trait ProducerStream extends AkkaStreams with EventSourcing {
implicit val system: ActorSystem
def self: ActorRef
def createStreamSource[msgType] = {
Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
}
def createStreamSink(producerProperties: Map[String, String]) = {
val kafkaMBAddress = producerProperties("bootstrap-servers")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)
Producer.plainSink(producerSettings)
}
def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
val numberOfPartitions = producerProperties("num.partitions").toInt -1
val topicToPublish = producerProperties("publish-topic")
val rand = new scala.util.Random
val range = 0 to numberOfPartitions
Flow[msgType].map { msg =>
val partition = range(rand.nextInt(range.length))
val stringJSONMessage = Conversion[msgType].convertToJson(msg)
new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
}
}
}
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala
示例7: ReactiveKafkaSingleConsumerMultipleProducerScala
//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }
class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {
implicit val system = ActorSystem("reactivekafkascala")
implicit val mat = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9093")
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))
def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)
val producerFlow1 =
Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow2 =
Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow3 =
Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
kafkaSource
.via(producerFlow1)
.via(producerFlow2)
.via(producerFlow3)
.batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
batch.updated(elem.committableOffset)
}.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
}
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala
示例8: ReactiveProducer
//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka
import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }
object ReactiveProducer {
val system = ActorSystem("example")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer.create(system)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
def produce(msg: String): Unit = {
val done = Source(1 to 1)
.map(_.toString)
.map { elem =>
println("\n" + msg);
new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
}
.runWith(Producer.plainSink(producerSettings, kafkaProducer))
// #plainSinkWithProducer
// terminateWhenDone(done)
}
def terminateWhenDone(result: Future[Done]): Unit = {
result.onComplete {
case Failure(e) =>
system.log.error(e, e.getMessage)
system.terminate()
case Success(_) => system.terminate()
}
}
}
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala
示例9: FlowProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
object FlowProducerMain extends App {
implicit val system = ActorSystem("FlowProducerMain")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done = Source(1 to 10)
.map { n =>
// val partition = math.abs(n) % 2
val partition = 0
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic1", partition, null, n.toString
), n)
}
.via(Producer.flow(producerSettings))
.map { result =>
val record = result.message.record
println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
s"(${result.message.passThrough})")
result
}
.runWith(Sink.ignore)
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:36,代码来源:FlowProducerMain.scala
示例10: CommitConsumerToFlowProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object CommitConsumerToFlowProducerMain extends App {
implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
implicit val materializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommitConsumerToFlowProducer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.via(Producer.flow(producerSettings))
.mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
.runWith(Sink.ignore)
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala
示例11: PlainSinkProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
object PlainSinkProducerMain extends App {
implicit val system = ActorSystem("PlainSinkProducerMain")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done = Source(1 to 10)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:29,代码来源:PlainSinkProducerMain.scala
示例12: ConsumerToCommitableSinkProducerMain
//设置package包名称以及导入依赖的类
package com.example.producer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
object ConsumerToCommitableSinkProducerMain extends App {
implicit val system = ActorSystem("Consumer2ProducerMain")
implicit val materializer = ActorMaterializer()
//TODO: move to configuration application.conf
val consumerSettings =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("Consumer2Producer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
//TODO: move to configuration application.conf
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
"topic2",
msg.record.value
), msg.committableOffset)
}
.runWith(Producer.commitableSink(producerSettings))
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala
示例13: KafkaPushQueue
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager
import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future
class KafkaPushQueue(topic: String, settings: ProducerSettings[Long, PushData])
extends PushQueue {
override def pushDataSink: Sink[(Long, PushData), Future[Long]] =
Flow[(Long, PushData)]
.map {
case (key, value) => Message(
new ProducerRecord[Long, PushData]("notifications", key, value), key)
}
.via(Producer.flow(settings))
.toMat(Sink.fold(0L)({ case (acc, _) => acc + 1}))(Keep.right)
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:23,代码来源:KafkaPushQueue.scala
示例14: BaseStation2Kafka
//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }
import scala.concurrent.Future
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.{ encodeLocalDateDefault, encodeZonedDateTimeDefault }
import com.darienmt.airplaneadventures.basestation.collector.parsing.CirceEncoders._
object BaseStation2Kafka {
case class SourceConfig(address: String, port: Int)
case class SinkConfig(address: String, port: Int, topic: String)
def apply(sourceConfig: SourceConfig, sinkConfig: SinkConfig)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Done] =
BaseStationSource(sourceConfig.address, sourceConfig.port)
.map(_.asJson.noSpaces)
.map(m => new ProducerRecord[Array[Byte], String](sinkConfig.topic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${sinkConfig.address}:${sinkConfig.port}")
)
)
}
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:35,代码来源:BaseStation2Kafka.scala
示例15: Main
//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.rawcollector
import java.net.InetSocketAddress
import akka.Done
import akka.io.Inet.SocketOption
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{ Framing, Source, Tcp }
import akka.util.ByteString
import com.darienmt.keepers.{ Generator, KeepThisUp, MainCommons }
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }
import scala.collection.immutable
import scala.collection.immutable.IndexedSeq
import scala.util.Success
import scala.concurrent.duration._
object Main extends App with MainCommons {
implicit def asFiniteDuration(d: java.time.Duration): FiniteDuration =
scala.concurrent.duration.Duration.fromNanos(d.toNanos)
val bsAddress = config.getString("station.address")
val bsPort = config.getInt("station.port")
val connectTimeout: Duration = config.getDuration("station.connectTimeout")
val idleTimeout: Duration = config.getDuration("station.idleTimeout")
val kafkaAddress = config.getString("kafka.address")
val kafkaPort = config.getInt("kafka.port")
val kafkaTopic = config.getString("kafka.topic")
val generator: Generator = () => Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(
remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
connectTimeout = connectTimeout,
idleTimeout = idleTimeout
)
.via(Framing.delimiter(ByteString("\n"), 256))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
)
val keeper = KeepThisUp(config)
keeper(generator)
}
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:56,代码来源:Main.scala
示例16: produceRecord
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import scala.concurrent.Future
trait KafkaTest {
def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
Source.single(new ProducerRecord("mail.command.send", key, value))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
.run()
}
def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
.withGroupId(UUID.randomUUID.toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.filter(pf.isDefinedAt)
.map(pf)
.toMat(Sink.head)(Keep.right)
.run()
}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala
示例17: KafkaDao
//设置package包名称以及导入依赖的类
package dao
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{
ConsumerSettings,
ProducerSettings,
Subscriptions
}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}
import play.api.libs.json.Format
import [email protected]@
class KafkaDao[Event](val topic: String @@ Event)(implicit mat: Materializer,
sys: ActorSystem,
format: Format[Event])
extends EventDao[Event] {
def producerSettings =
ProducerSettings(sys,
new ByteArraySerializer(),
new StringSerializer())
.withBootstrapServers("localhost:9092")
private def kafkaIn: Sink[String, NotUsed] =
Flow[String]
.map { elem =>
new ProducerRecord[Array[Byte], String](topic,
elem)
}
.to(Producer.plainSink(producerSettings))
private def consumerSettings =
ConsumerSettings(sys,
new ByteArrayDeserializer,
new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
private def kafkaOut: Source[String, Control] =
Consumer
.plainSource(consumerSettings,
Subscriptions.topics(topic))
.map(_.value())
override protected def eventStore
: Flow[String, String, NotUsed] =
Flow.fromSinkAndSource(kafkaIn, kafkaOut)
}
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:60,代码来源:KafkaDao.scala
示例18: KafkaSink
//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.sink
import java.util.concurrent.ConcurrentHashMap
import akka.NotUsed
import akka.kafka.ProducerMessage
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{ Flow, Keep }
import com.thenetcircle.event_dispatcher.Event
import com.thenetcircle.event_dispatcher.driver.adapter.KafkaSinkAdapter
import com.thenetcircle.event_dispatcher.driver.extractor.Extractor
import com.thenetcircle.event_dispatcher.driver.{ KafkaKey, KafkaValue }
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
object KafkaSink {
lazy private val producerList = new ConcurrentHashMap[String, KafkaProducer[KafkaKey, KafkaValue]]
def apply(
settings: KafkaSinkSettings
): Flow[Event, ProducerMessage.Result[KafkaKey, KafkaValue, NotUsed.type], NotUsed] = {
val producerName = settings.name
val producerSettings = settings.producerSettings
val producer: KafkaProducer[KafkaKey, KafkaValue] = if (producerList.containsKey(producerName)) {
producerList.get(producerName)
} else {
producerSettings.createKafkaProducer()
}
val kafkaProducerFlow = Flow[ProducerRecord[KafkaKey, KafkaValue]]
.map(Message(_, NotUsed))
.via(Producer.flow(settings.producerSettings, producer))
Flow[Event]
.map(Extractor.deExtract)
.map(KafkaSinkAdapter.unfit)
.viaMat(kafkaProducerFlow)(Keep.left)
}
}
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:43,代码来源:KafkaSink.scala
示例19: KafkaEventProcessor
//设置package包名称以及导入依赖的类
package events
import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import com.google.inject.Inject
import events.KafkaEventProcessor.Init
import events.Serializer.EventSerializer
import models.KafkaEvents.{Event, Hello}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import play.api.{Configuration, Logger}
import scala.concurrent.ExecutionContextExecutor
class KafkaEventProcessor @Inject()(config: Configuration) extends Actor with ActorLogging {
private val eventStream = context.system.eventStream
implicit val materializer: ActorMaterializer = ActorMaterializer()(context.system)
implicit val ctx: ExecutionContextExecutor = context.dispatcher
private val server = config.getString("kafka.broker").getOrElse("localhost:9092")
private val topic = config.getString("kafka.topic").getOrElse("test")
override def preStart(): Unit = {
super.preStart()
self ! Init
log.info("Start EventsProcessorActor")
}
override def postStop(): Unit = {
eventStream.unsubscribe(self)
super.postStop()
}
override def receive = {
case Init => createProducer()
}
private def createProducer() = {
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer(), new EventSerializer())
.withBootstrapServers(server)
val jobManagerSource = Source.actorPublisher[Event](Props(classOf[KafkaEventPublisher]))
Flow[Event].map {
case e: Hello => new ProducerRecord[Array[Byte], Event](topic, e)
}.to(Producer.plainSink(producerSettings))
.runWith(jobManagerSource)
Logger.info("init producer")
}
}
object KafkaEventProcessor {
case object Init
}
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:61,代码来源:KafkaEventProcessor.scala
示例20: self
//设置package包名称以及导入依赖的类
package com.omearac.producers
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.ProtobufMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
trait ProducerStream extends AkkaStreams with EventSourcing {
implicit val system: ActorSystem
def self: ActorRef
def createStreamSource[msgType] = {
Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
}
def createStreamSink(producerProperties: Map[String, String]) = {
val kafkaMBAddress = producerProperties("bootstrap-servers")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer).withBootstrapServers(kafkaMBAddress)
Producer.plainSink(producerSettings)
}
def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
val numberOfPartitions = producerProperties("num.partitions").toInt -1
val topicToPublish = producerProperties("publish-topic")
val rand = new scala.util.Random
val range = 0 to numberOfPartitions
Flow[msgType].map { msg =>
val partition = range(rand.nextInt(range.length))
val stringJSONMessage = Conversion[msgType].convertToByteArray(msg)
new ProducerRecord[Array[Byte], Array[Byte]](topicToPublish, partition, null, stringJSONMessage)
}
}
}
开发者ID:omearac,项目名称:reactive-kafka-microservice-protobuf,代码行数:42,代码来源:ProducerStream.scala
注:本文中的akka.kafka.scaladsl.Producer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论