本文整理汇总了Scala中kafka.producer.KeyedMessage类的典型用法代码示例。如果您正苦于以下问题:Scala KeyedMessage类的具体用法?Scala KeyedMessage怎么用?Scala KeyedMessage使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KeyedMessage类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: TestProducer
//设置package包名称以及导入依赖的类
package com.rockiey.kafka
import java.util.{Date, Properties, Random}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.junit.{After, Before, Test}
class TestProducer {
val brokers = "localhost:9092"
val topic = "test"
val rnd = new Random()
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
//props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
props.put("producer.type", "async")
//props.put("request.required.acks", "1")
var producer: Producer[String, String] = null
@Before
def before: Unit = {
val config = new ProducerConfig(props)
producer = new Producer[String, String](config)
}
@After
def after: Unit = {
producer.close()
}
def produce(events: Int): Unit = {
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val runtime = new Date().getTime()
val ip = "192.168.2." + rnd.nextInt(255)
val msg = runtime + "," + nEvents + ",www.example.com," + ip
val data = new KeyedMessage[String, String](topic, ip, msg)
producer.send(data)
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
}
@Test
def testProducer: Unit = {
produce(100)
}
@Test
def testConsumer {
}
}
开发者ID:rockie-yang,项目名称:explore-spark-kafka,代码行数:58,代码来源:TestProducer.scala
示例2: KafkaUtilities
//设置package包名称以及导入依赖的类
package com.fortysevendeg.log.utils
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkConnection
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaUtilities {
def createKafkaProducer(): Producer[String, String] = {
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
// props.put("partitioner.class", "com.fortysevendeg.biglog.SimplePartitioner")
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
}
def createKafkaConsumer(): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, String](props)
}
def createTopicIntoKafka(topic: String, numPartitions: Int, replicationFactor: Int): Unit = {
val zookeeperConnect = "localhost:2181"
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000
val zkClient = ZkUtils.createZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs)
val zkUtils = new ZkUtils(zkClient, zkConnection = new ZkConnection(zookeeperConnect), isSecure = false)
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, new Properties)
zkClient.close()
}
def d(kafkaProducer: Producer[String, String], topic: String, message: String) = {
kafkaProducer.send(new KeyedMessage[String, String](topic, message))
}
}
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:52,代码来源:KafkaUtilities.scala
示例3: send
//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.core.kafka
import java.util.Properties
import it.agilelab.bigdata.wasp.core.WaspEvent
import it.agilelab.bigdata.wasp.core.WaspEvent.WaspMessageEnvelope
import it.agilelab.bigdata.wasp.core.models.configuration.{TinyKafkaConfig, KafkaConfigModel}
import kafka.producer.{DefaultPartitioner, KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.KafkaConfig
def send(topic: String, key: K, message: V): Unit =
batchSend(topic, key, Seq(message))
def batchSend(topic: String, key: K, batch: Seq[V]): Unit = {
val messages = batch map (msg => new KeyedMessage[K, V](topic, key, msg))
producer.send(messages.toArray: _*)
}
def close(): Unit = producer.close()
}
object WaspKafkaWriter {
def createConfig(brokers: Set[String], batchSize: Int, producerType: String, serializerFqcn: String, keySerializerFqcn: String, partitionerFqcn: String): ProducerConfig = {
val props = new Properties()
props.put("metadata.broker.list", brokers.mkString(","))
props.put("serializer.class", serializerFqcn)
props.put("key.serializer.class", keySerializerFqcn)
props.put("partitioner.class", partitionerFqcn)
props.put("producer.type", producerType)
props.put("request.required.acks", "1")
props.put("batch.num.messages", batchSize.toString)
new ProducerConfig(props)
}
def defaultConfig(config: KafkaConfig): ProducerConfig =
createConfig(Set(s"${config.hostName}:${config.port}"), 100, "async", classOf[StringEncoder].getName, classOf[StringEncoder].getName, classOf[DefaultPartitioner].getName)
}
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:43,代码来源:WaspKafkaWriter.scala
示例4: KafkaEndpoint
//设置package包名称以及导入依赖的类
package controllers
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import model.grid.Flight
import model.kafka.FlightEvent
import model.web.SubmittedFlight
import play.api.libs.json._
import play.api.mvc._
object KafkaEndpoint extends Controller {
val counter = new AtomicInteger(0)
implicit val flightsReader = Json.reads[Flight]
implicit val submittedFlightsReader = Json.reads[SubmittedFlight]
def submitFlight = Action(parse.json) { request =>
parseJson(request) { flight: SubmittedFlight =>
val rowId = counter.incrementAndGet()
val event = FlightEvent(rowId, flight)
send(event.toString(), "flights")
Created(rowId.toString)
}
}
private def parseJson[R](request: Request[JsValue])(block: R => Result)(implicit reads: Reads[R]): Result = {
request.body.validate[R](reads).fold(
valid = block,
invalid = e => {
val error = e.mkString
BadRequest(error)
}
)
}
// hardcoded to simplify the demo code
lazy val kafkaConfig = {
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props
}
lazy val producer = new Producer[String, String](new ProducerConfig(kafkaConfig))
private def send(message: String, topic: String) = producer.send(new KeyedMessage[String, String](topic, message))
}
开发者ID:garvsd,项目名称:flight-delay-prediction_using-spark-with-hdfs-on-HDP,代码行数:51,代码来源:KafkaEndpoint.scala
示例5: TicketsProducer
//设置package包名称以及导入依赖的类
package com.octo.nad.handson.producer
import java.util.Properties
import java.util.concurrent.Executors
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
object TicketsProducer extends App with AppConf {
var sleep = if (args.length == 1) args(0).toInt else 1000
val millisInAnHour = 60 * 60 * 1000
// On utilise presque tous les coeurs disponibles pour générer des tickets (opérations CPU-bound)
val cores = Runtime.getRuntime.availableProcessors
val props = new Properties
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "sync")
val producer = new Producer[String, String](new ProducerConfig(props))
produce()
ThroughoutMeter.start
ThroughputCursor.start
def produce() = {
val pool = Executors.newFixedThreadPool(cores)
val pr = new ProducerRunnable()
for (i <- Range(0, cores - 1)) pool.submit(new ProducerRunnable)
}
class ProducerRunnable extends Runnable {
override def run(): Unit = {
while (true) {
val ticket = TicketsGenerator.generateTicket
ThroughoutMeter.counter += 1
// Sinusoïdale de période T=1heure pour générer du chiffre d'affaire de façon non-linéaire
Thread.sleep((cores * sleep * (1 + 0.5 * Math.cos(System.currentTimeMillis() * 2 * Math.PI / millisInAnHour))).toInt)
producer.send(new KeyedMessage(topic, ticket.toJson))
}}
}
}
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:44,代码来源:TicketsProducer.scala
示例6: KafkaProducer
//设置package包名称以及导入依赖的类
package com.project.producer
import akka.actor.Actor
import akka.actor.Props
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
class KafkaProducer(val broker: String, val topic: String) extends Actor {
val props = new Properties()
props.put("metadata.broker.list", broker)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
//producer.send(new KeyedMessage[String, String](topic, broker, "test nachricht test"))
def receive = {
case x : String => producer.send(new KeyedMessage[String, String](topic, broker, x))
case _ => ()
}
}
object KafkaProducer {
def props(broker: String, topic: String): Props = Props(new KafkaProducer(broker, topic))
}
开发者ID:jlagarden,项目名称:dhbw-project-app,代码行数:31,代码来源:KafkaProducer.scala
示例7: KafkaProducer
//设置package包名称以及导入依赖的类
package twitter
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.slf4j.LoggerFactory
object KafkaProducer {
import Configuration._
private val log = LoggerFactory.getLogger(getClass)
def main(args: Array[String]) {
Setup.ssc.start()
Setup.ssc.awaitTermination()
}
log.info("Creating Kafka producer")
val stream = Setup.createStream
val producedTweets = Setup.ssc.sparkContext.accumulator(0L, "Kafka produced Tweets")
stream.map { tweet =>
val location = tweet.getGeoLocation match {
case null => None
case gl => Some(Map("lat" -> gl.getLatitude, "lon" -> gl.getLongitude))
}
Tweet(tweet.getText, tweet.getCreatedAt, location, tweet.getLang, tweet.getUser.getName)
}.foreachRDD(rdd => {
log.info(s"RDD size: ${rdd.count()}")
log.info(s"Total tweets produced: ${producedTweets.value}")
rdd.foreachPartition { partition =>
val producerConfig = new ProducerConfig(p)
val producer = new Producer[String, String](producerConfig)
partition.foreach{ tweet =>
producedTweets += 1
producer.send(
new KeyedMessage[String, String](TOPIC, TweetSerializer.toJson(tweet)))
}
producer.close()
}
})
log.info("Starting Twitter Kafka producer stream")
}
开发者ID:airtonjal,项目名称:DevCamp-2016,代码行数:46,代码来源:KafkaProducer.scala
示例8: TweetsKafkaProducer
//设置package包名称以及导入依赖的类
package org.cg.spark.databroker.example
import java.util.Properties
import com.twitter.bijection.avro.SpecificAvroCodecs.{toJson, toBinary}
import com.typesafe.config.ConfigFactory
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import twitter4j.{Status, FilterQuery}
import twitter4j.TwitterStream
import org.cg.spark.databroker.example.TwitterStream.OnTweetPosted
object TweetsKafkaProducer {
private val conf = ConfigFactory.load("tweets-kafka")
val KafkaTopic = "tweets"
val kafkaProducer = {
val props = new Properties()
props.put("metadata.broker.list", conf.getString("kafka.brokers"))
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, Array[Byte]](config)
}
val filterUsOnly = new FilterQuery().locations(Array(
Array(-126.562500,30.448674),
Array(-61.171875,44.087585)))
def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}
private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}
private def sendToKafka(t:Tweet) {
println(t.toString())
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}
}
开发者ID:CodeGerm,项目名称:spark-databroker,代码行数:50,代码来源:TweetsKafkaProducer.scala
注:本文中的kafka.producer.KeyedMessage类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论