本文整理汇总了Scala中org.apache.kafka.clients.producer.ProducerRecord类的典型用法代码示例。如果您正苦于以下问题:Scala ProducerRecord类的具体用法?Scala ProducerRecord怎么用?Scala ProducerRecord使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ProducerRecord类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: SimpleKafkaProducer
//设置package包名称以及导入依赖的类
package com.example
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.json4s.{DefaultFormats, jackson}
class SimpleKafkaProducer(kafkaSocket: Socket, topic: String, brokers: Int = 1) {
private val serializer = "org.apache.kafka.common.serialization.StringSerializer"
private def configuration = {
val props = new Properties()
props.put("bootstrap.servers", kafkaSocket.toString())
props.put("key.serializer", serializer)
props.put("value.serializer", serializer)
props
}
def send[T <: AnyRef](message: T) = {
implicit val serialization = jackson.Serialization
implicit val formats = DefaultFormats
val producer = new KafkaProducer[String, String](configuration)
val jsonMessage = serialization.write[T](message)
val data = new ProducerRecord[String, String](topic, jsonMessage)
producer.send(data)
producer.close()
}
}
开发者ID:frossi85,项目名称:financial-statistics-crawler,代码行数:31,代码来源:SimpleKafkaProducer.scala
示例3: sendToKafkaWithNewProducer
//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.kafka
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
trait EspSimpleKafkaProducer {
val kafkaConfig: KafkaConfig
def sendToKafkaWithNewProducer(topic: String, key: Array[Byte], value: Array[Byte]): Future[RecordMetadata] = {
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
try {
producer = createProducer()
sendToKafka(topic, key, value)(producer)
} finally {
if (producer != null) {
producer.close()
}
}
}
//method with such signature already exists in "net.cakesolutions" %% "scala-kafka-client" % "0.9.0.0" but I struggled to add this dependency...
def sendToKafka(topic: String, key: Array[Byte], value: Array[Byte])(producer: KafkaProducer[Array[Byte], Array[Byte]]): Future[RecordMetadata] = {
val promise = Promise[RecordMetadata]()
producer.send(new ProducerRecord(topic, key, value), producerCallback(promise))
promise.future
}
def createProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
new KafkaProducer[Array[Byte], Array[Byte]](KafkaEspUtils.toProducerProperties(kafkaConfig))
}
private def producerCallback(promise: Promise[RecordMetadata]): Callback =
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val result = if (exception == null) Success(metadata) else Failure(exception)
promise.complete(result)
}
}
}
开发者ID:TouK,项目名称:nussknacker,代码行数:41,代码来源:EspSimpleKafkaProducer.scala
示例4: ReadyKafkaProducer
//设置package包名称以及导入依赖的类
package com.bencassedy.readykafka.producer
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
class ReadyKafkaProducer {
case class KafkaProducerConfigs(brokerList: String = "127.0.0.1:9092") {
val properties = new Properties()
properties.put("bootstrap.servers", brokerList)
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
// properties.put("serializer.class", classOf[StringDeserializer])
// properties.put("batch.size", 16384)
// properties.put("linger.ms", 1)
// properties.put("buffer.memory", 33554432)
}
val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)
def produce(topic: String, messages: Iterable[String]): Unit = {
messages.foreach { m =>
producer.send(new ProducerRecord[String, String](topic, m))
}
producer.close(100L, TimeUnit.MILLISECONDS)
}
}
开发者ID:bencassedy,项目名称:ready-kafka,代码行数:31,代码来源:ReadyKafkaProducer.scala
示例5: Application
//设置package包名称以及导入依赖的类
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object Application extends App {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val simIDs = 10000 to 99999 //99000
val brokers = "192.168.100.211:6667,192.168.100.212:6667,192.168.100.213:6667";
val topic = "newTest";
val props = new Properties
props.put("bootstrap.servers", brokers)
props.put("client.id", "Producer")
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[Integer, String](props)
while (true) {
for (simID <- simIDs) {
val data = Data(
"64846867247",
"?D" + simID,
formatter.format(new Date()),
121.503,
31.3655,
78,
0,
42,
52806.7
)
// println(Data.getString(data))
producer.send(new ProducerRecord[Integer, String](topic, Data.getString(data)))
// TimeUnit.NANOSECONDS.sleep(100)
}
println("-------------------------------"+new Date())
TimeUnit.MINUTES.sleep(18)
}
}
开发者ID:qiuwsh,项目名称:dataSimulator,代码行数:43,代码来源:Application.scala
示例6: Generator
//设置package包名称以及导入依赖的类
package data.processing.kafkagenerator
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.github.andr83.scalaconfig._
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import data.processing.avro.AvroEncoder
import scala.concurrent.forkjoin.ThreadLocalRandom
object Generator {
val metricsRegistry = new MetricsRegistry
val config = ConfigFactory.load()
val props = config.getConfig("kafka-client").as[Properties]
val topic = config.getString("kafka-client.topic")
val numberOfUsers = config.getInt("generator.number.of.users")
val urls = config.getStringList("generator.urls")
val eventTypes = config.getStringList("generator.event.types")
val throughput = config.getInt("generator.throughput")
val avroEncoder = new AvroEncoder("/event-record.json")
def generateEvent() = {
val id = ThreadLocalRandom.current().nextLong()
val ts = java.lang.System.currentTimeMillis()
val userId = ThreadLocalRandom.current().nextInt(numberOfUsers).toHexString
val url = urls.get(ThreadLocalRandom.current().nextInt(urls.size()))
val eventType = eventTypes.get(ThreadLocalRandom.current().nextInt(eventTypes.size()))
(id, avroEncoder.encode((id, ts, userId, url, eventType)))
}
def main(args: Array[String]): Unit = {
val meter = metricsRegistry.newMeter(new MetricName("", "", ""), "", TimeUnit.SECONDS)
val producer = new KafkaProducer[String, Array[Byte]](props)
while(true) {
if (meter.meanRate < throughput) {
meter.mark()
val event = generateEvent()
producer.send(new ProducerRecord[String, Array[Byte]](topic, event._1.toString, event._2))
}
else {
Thread.sleep(1)
}
}
producer.flush()
producer.close()
}
}
开发者ID:ipogudin,项目名称:data-processing-examples,代码行数:56,代码来源:Generator.scala
示例7: ProducerExample
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.producer
import java.util.Properties
import akka.util.ByteString
import edu.uw.at.iroberts.wirefugue.pcap.IPAddress
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord}
class ProducerExample {
val props: Properties = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", 0.asInstanceOf[java.lang.Integer])
props.put("batch.size", 16384.asInstanceOf[java.lang.Integer])
props.put("linger.ms", 1.asInstanceOf[java.lang.Integer])
props.put("buffer.memory", 33554432.asInstanceOf[java.lang.Integer])
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
type IPProto = Byte
type Port = Short
type ProducerKey = (IPProto, IPAddress, Port, IPAddress, Port)
type PacketProducerRecord = ProducerRecord[ProducerKey, ByteString]
val producer: Producer[ProducerKey, Array[Byte]] = new KafkaProducer(props)
for (i <- 0 until 100) {
val key: ProducerKey = (4.toByte, IPAddress("192.168.0.1"), 25563.toShort, IPAddress("192.168.0.2"), 80.toShort)
val someByteString: ByteString = ???
val value: Array[Byte] = someByteString.toArray
producer.send(new ProducerRecord[ProducerKey, Array[Byte]]("ipv4-packets", key, value))
}
producer.close()
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:36,代码来源:ProducerExample.scala
示例8: SimpleProducer
//设置package包名称以及导入依赖的类
package producers
import java.util.Properties
import model.Employee
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object SimpleProducer extends App{
lazy val producer: KafkaProducer[String, String] = new KafkaProducer(getKafkaConfigProperties)
lazy val testEmpObjects:List[Employee] = (0 to 1000).map(x=>Employee("John"+x, x)).toList
testEmpObjects.foreach { emp =>
producer.send(new ProducerRecord[String, String]("raw-data-1", emp.id.toString, Employee.asJson(emp)))
}
def getKafkaConfigProperties: Properties = {
val config = new Properties()
config.put("bootstrap.servers", "localhost:9092")
config.put("group.id", "group1")
config.put("client.id", "client1")
config.put("enable.auto.commit", "true")
config.put("session.timeout.ms", "10000")
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
config
}
}
开发者ID:ajit-scala,项目名称:kafka-consumers,代码行数:34,代码来源:SimpleProducer.scala
示例9: KafkaFeedsExporter
//设置package包名称以及导入依赖的类
package ru.fediq.scrapingkit.backend
import cakesolutions.kafka.KafkaProducer
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import ru.fediq.scrapingkit.scraper.ScrapedEntity
import scala.concurrent.Future
class KafkaFeedsExporter(
val bootstrapServer: String,
val topic: String
) extends FeedExporter {
val producer = KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new StringSerializer, bootstrapServer))
override def store[T <: ScrapedEntity](entity: T): Future[RecordMetadata] = {
producer.send(new ProducerRecord(topic, entity.dump))
}
override def close() = producer.close()
}
开发者ID:fediq,项目名称:scraping-kit,代码行数:22,代码来源:KafkaFeedsExporter.scala
示例10: DataProducerThread
//设置package包名称以及导入依赖的类
package org.hpi.esb.datasender
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.hpi.esb.commons.util.Logging
class DataProducerThread(dataProducer: DataProducer, kafkaProducer: KafkaProducer[String, String],
dataReader: DataReader, topics: List[String], singleColumnMode: Boolean,
duration: Long, durationTimeUnit: TimeUnit) extends Runnable with Logging {
var numberOfRecords: Int = 0
val startTime: Long = currentTime
val endTime: Long = startTime + durationTimeUnit.toMillis(duration)
def currentTime: Long = System.currentTimeMillis()
def run() {
if (currentTime < endTime) {
send(dataReader.readRecords)
} else {
logger.info(s"Shut down after $durationTimeUnit: $duration.")
dataProducer.shutDown()
}
}
def send(messagesOption: Option[List[String]]): Unit = {
messagesOption.foreach(messages => {
numberOfRecords += 1
if (singleColumnMode) {
sendSingleColumn(messages)
} else {
sendMultiColumns(messages)
}
})
}
def sendSingleColumn(messages: List[String]): Unit = {
val message = messages.head
topics.foreach(
topic => {
sendToKafka(topic = topic, message = message)
})
}
def sendToKafka(topic: String, message: String): Unit = {
val record = new ProducerRecord[String, String](topic, message)
kafkaProducer.send(record)
logger.debug(s"Sent value $message to topic $topic.")
}
def sendMultiColumns(messages: List[String]): Unit = {
messages.zip(topics)
.foreach {
case (message, topic) =>
sendToKafka(topic = topic, message = message)
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:60,代码来源:DataProducerThread.scala
示例11: OrderProcessingService
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import java.util
import domain.Order
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
import scala.util.Try
class OrderProcessingService(orderConsumer: KafkaConsumer[String, String],
orderConsumerTopic: String,
storeUpdateProducer: KafkaProducer[String, String],
storeUpdateTopic: String) {
import com.owlike.genson.defaultGenson._
var running = true
def start() = {
orderConsumer.subscribe(util.Arrays.asList(orderConsumerTopic))
while (running) {
val records = orderConsumer.poll(100)
records.iterator().foreach(processOrder)
}
}
def processOrder(record: ConsumerRecord[String, String]): Unit = {
println(s"Processing ${record.value()}")
for {
order <- Try(fromJson[Order](record.value()))
_ <- Try {
println(s"Sending to store service: $order")
storeUpdateProducer.send(new ProducerRecord[String, String](storeUpdateTopic, toJson(order)))
}
} yield Unit
println(s"Processing ${record.value()}")
}
def stop() = {
orderConsumer.close()
running = false
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:51,代码来源:OrderProcessingService.scala
示例12: ConfirmationService
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import java.util
import domain.{OrderStatus, UpdateStoreStatus}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Try
class ConfirmationService(confirmationConsumer: KafkaConsumer[String, String],
confirmationTopic: String,
replyProducer: KafkaProducer[String, String],
replyTopic: String) {
import com.owlike.genson.defaultGenson._
var running = true
def start() = {
confirmationConsumer.subscribe(util.Arrays.asList(confirmationTopic))
Future {
while (running) {
val records = confirmationConsumer.poll(100)
records.iterator().foreach(processConfirmation)
}
}.recover {
case ex => ex.printStackTrace()
}
}
def processConfirmation(record: ConsumerRecord[String, String]): Unit = {
println(s"Processing ${record.value()}")
for {
status <- Try(fromJson[UpdateStoreStatus](record.value()))
_ <- Try {
println(s"Replying $status")
replyProducer.send(new ProducerRecord(replyTopic, toJson(OrderStatus(status.orderId, status.success))))
}
} yield Unit
println(s"Processed ${record.value()}")
}
def stop() = {
confirmationConsumer.close()
running = false
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:55,代码来源:ConfirmationService.scala
示例13: OrderService
//设置package包名称以及导入依赖的类
package com.github.eventdrivenorders.api
import domain.Order
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import com.github.eventdrivenorders.api.json.OrderFormats._
import spray.json._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class OrderService(orderProducer: KafkaProducer[String, String],
orderTopic: String,
orderStatusService: OrderStatusService) {
def submit(order: Order) = Future {
println(s"Submitting order: $order")
val orderJson = order.toJson.toString()
// orderStatusService.addPending(order.id)
orderProducer.send(new ProducerRecord[String, String](orderTopic, orderJson))
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:23,代码来源:OrderService.scala
示例14: running
//设置package包名称以及导入依赖的类
package producers
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.Future
trait Producerable extends ActorBroker {
val config: AppConfig
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")
def running(): Receive = {
case Stop =>
log.info("Stopping Kafka producer stream and actor")
context.stop(self)
}
def sendToSink(message: String): Unit = {
log.info(s"Attempting to produce message on topic $topicName")
val kafkaSink = Producer.plainSink(producerSettings)
val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
.toMat(kafkaSink)(Keep.both)
.run()
future.onFailure {
case ex =>
log.error("Stream failed due to error, restarting", ex)
throw ex
}
context.become(running())
log.info(s"Writer now running, writing random numbers to topic $topicName")
}
case object Stop
}
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala
示例15: Main
//设置package包名称以及导入依赖的类
import java.util.concurrent.TimeUnit.SECONDS
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem.apply("akka-stream-kafka")
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092;localhost:9093")
Source.repeat(0)
.scan(0)((next, _) => next + 1)
.throttle(1, FiniteDuration(2L, SECONDS), 1, ThrottleMode.Shaping)
.map(nextInt => {
val topicName = "topic1"
val partitionCount = 2
val partition = nextInt % partitionCount
new ProducerRecord[Array[Byte], String](topicName, nextInt.toString.getBytes, nextInt.toString)
// new ProducerRecord[Array[Byte], String](topicName, partition, null, nextInt.toString)
})
.runWith(Producer.plainSink(producerSettings))
}
}
开发者ID:kczulko,项目名称:akka-streams-kafka,代码行数:37,代码来源:Main.scala
示例16: self
//设置package包名称以及导入依赖的类
package com.omearac.producers
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
trait ProducerStream extends AkkaStreams with EventSourcing {
implicit val system: ActorSystem
def self: ActorRef
def createStreamSource[msgType] = {
Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
}
def createStreamSink(producerProperties: Map[String, String]) = {
val kafkaMBAddress = producerProperties("bootstrap-servers")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)
Producer.plainSink(producerSettings)
}
def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
val numberOfPartitions = producerProperties("num.partitions").toInt -1
val topicToPublish = producerProperties("publish-topic")
val rand = new scala.util.Random
val range = 0 to numberOfPartitions
Flow[msgType].map { msg =>
val partition = range(rand.nextInt(range.length))
val stringJSONMessage = Conversion[msgType].convertToJson(msg)
new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
}
}
}
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala
示例17: ReactiveKafkaSingleConsumerMultipleProducerScala
//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }
class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {
implicit val system = ActorSystem("reactivekafkascala")
implicit val mat = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9093")
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))
def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)
val producerFlow1 =
Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow2 =
Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
val producerFlow3 =
Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)
kafkaSource
.via(producerFlow1)
.via(producerFlow2)
.via(producerFlow3)
.batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
batch.updated(elem.committableOffset)
}.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
}
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala
示例18: KProducer
//设置package包名称以及导入依赖的类
package org.parsec
import java.util.Properties
import java.util.concurrent.Future
import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {
val kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put("schema.registry.url", "http://localhost:8081")
private lazy val producer = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)
def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
val keyRec = RecordFormat[K].to(key)
val valueRec = RecordFormat[V].to(value)
val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
producer.send(data)
}
}
开发者ID:cryptocurrencyindia,项目名称:Parsec,代码行数:29,代码来源:KProducer.scala
示例19: ReactiveProducer
//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka
import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }
object ReactiveProducer {
val system = ActorSystem("example")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer.create(system)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
def produce(msg: String): Unit = {
val done = Source(1 to 1)
.map(_.toString)
.map { elem =>
println("\n" + msg);
new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
}
.runWith(Producer.plainSink(producerSettings, kafkaProducer))
// #plainSinkWithProducer
// terminateWhenDone(done)
}
def terminateWhenDone(result: Future[Done]): Unit = {
result.onComplete {
case Failure(e) =>
system.log.error(e, e.getMessage)
system.terminate()
case Success(_) => system.terminate()
}
}
}
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala
示例20: KafkaSink
//设置package包名称以及导入依赖的类
package com.flipkart.connekt.firefly.sinks.kafka
import java.util.Properties
import akka.stream.scaladsl.Sink
import com.flipkart.connekt.commons.entities.SubscriptionEvent
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import com.flipkart.connekt.commons.utils.StringUtils._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaSink(topic: String, brokers: String) {
private val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](props)
def getKafkaSink = Sink.foreach[SubscriptionEvent](e => {
ConnektLogger(LogFile.SERVICE).info(s"KafkaSink event written to $topic at $brokers")
ConnektLogger(LogFile.SERVICE).trace(s"KafkaSink event written to $topic at $brokers : {}", supplier(e.payload.toString))
val data: ProducerRecord[String, String] = new ProducerRecord(topic, e.payload.toString)
producer.send(data)
})
}
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:29,代码来源:KafkaSink.scala
注:本文中的org.apache.kafka.clients.producer.ProducerRecord类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论