• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ProducerConfig类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.producer.ProducerConfig的典型用法代码示例。如果您正苦于以下问题:Scala ProducerConfig类的具体用法?Scala ProducerConfig怎么用?Scala ProducerConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ProducerConfig类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: KafkaProducer

//设置package包名称以及导入依赖的类
package org.hpi.esb.flink.kafka

import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.writeToKafkaWithTimestamps
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.producer.ProducerConfig

class KafkaProducer(producerTopic: String) extends KafkaConnector {

  val uuid: String = java.util.UUID.randomUUID.toString

  // TODO: read properties from file
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
  props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, s"$producerTopic - $uuid")

  def produce(stream: DataStream[String]): Unit = {
    val config = writeToKafkaWithTimestamps(stream.javaStream, producerTopic, new SimpleStringSchema(), props)
    config.setWriteTimestampToKafka(true)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:21,代码来源:KafkaProducer.scala


示例2: DataDriver

//设置package包名称以及导入依赖的类
package org.hpi.esb.datasender

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.hpi.esb.commons.config.Configs
import org.hpi.esb.commons.util.Logging
import org.hpi.esb.datasender.config._
import org.hpi.esb.datasender.output.writers.DatasenderRunResultWriter

import scala.io.Source

class DataDriver() extends Logging {

  private val topics = Configs.benchmarkConfig.sourceTopics
  private val config = ConfigHandler.config
  private val dataReader = createDataReader(config.dataReaderConfig)
  private val kafkaProducerProperties = createKafkaProducerProperties(config.kafkaProducerConfig)
  private val kafkaProducer = new KafkaProducer[String, String](kafkaProducerProperties)
  private val resultHandler = new DatasenderRunResultWriter(config, Configs.benchmarkConfig, kafkaProducer)
  private val dataProducer = createDataProducer(kafkaProducer, dataReader, resultHandler)

  def run(): Unit = {
    dataProducer.execute()
  }

  def createKafkaProducerProperties(kafkaProducerConfig: KafkaProducerConfig): Properties = {

    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfig.bootstrapServers.get)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.keySerializerClass.get)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.valueSerializerClass.get)
    props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfig.acks.get)
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfig.batchSize.get.toString)
    props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfig.lingerTime.toString)
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerConfig.bufferMemorySize.toString)
    props
  }

  def createDataReader(dataReaderConfig: DataReaderConfig): DataReader = {
    new DataReader(Source.fromFile(dataReaderConfig.dataInputPath.get),
      dataReaderConfig.columns.get,
      dataReaderConfig.columnDelimiter.get,
      dataReaderConfig.dataColumnStart.get,
      dataReaderConfig.readInRam)
  }

  def createDataProducer(kafkaProducer: KafkaProducer[String, String], dataReader: DataReader,
                         resultHandler: DatasenderRunResultWriter): DataProducer = {

    val numberOfThreads = config.dataSenderConfig.numberOfThreads.get
    val sendingInterval = Configs.benchmarkConfig.sendingInterval
    val sendingIntervalTimeUnit = Configs.benchmarkConfig.getSendingIntervalTimeUnit()
    val duration = Configs.benchmarkConfig.duration
    val durationTimeUnit = Configs.benchmarkConfig.getDurationTimeUnit()
    val singleColumnMode = config.dataSenderConfig.singleColumnMode

    new DataProducer(resultHandler, kafkaProducer, dataReader, topics, numberOfThreads,
      sendingInterval, sendingIntervalTimeUnit, duration, durationTimeUnit, singleColumnMode)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:62,代码来源:DataDriver.scala


示例3: EmbeddedVKitM

//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.embedded

import java.util.Properties

import com.github.mmolimar.vkitm.server.{VKitMConfig, VKitMServer}
import com.github.mmolimar.vkitm.utils.TestUtils
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients.producer.ProducerConfig

class EmbeddedVKitM(zkConnection: String,
                    brokerList: String,
                    port: Int = TestUtils.getAvailablePort) extends Logging {

  private var vkitmServer: VKitMServer = null

  def startup() {
    info("Starting up VKitM server")

    val serverProps = new Properties
    serverProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    serverProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    serverProps.setProperty(KafkaConfig.PortProp, port.toString)
    serverProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + port)

    val producerProps = new Properties
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)

    val brokerPort = brokerList.split(":")(1)
    val consumerProps = new Properties
    consumerProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    consumerProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    consumerProps.setProperty(KafkaConfig.PortProp, brokerPort)
    consumerProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + brokerPort)

    vkitmServer = new VKitMServer(VKitMConfig.fromProps(serverProps, producerProps, consumerProps))
    vkitmServer.startup()

    info("Started embedded VKitM server")
  }

  def shutdown() {
    vkitmServer.shutdown()
  }

  def getPort: Int = port

  def getBrokerList: String = "localhost:" + getPort

  def getServer: VKitMServer = vkitmServer

  override def toString: String = {
    val sb: StringBuilder = StringBuilder.newBuilder
    sb.append("VKitM{")
    sb.append("config='").append(vkitmServer.config).append('\'')
    sb.append('}')

    sb.toString
  }

} 
开发者ID:mmolimar,项目名称:vkitm,代码行数:62,代码来源:EmbeddedVKitM.scala


示例4: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://localhost:8081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:cryptocurrencyindia,项目名称:Parsec,代码行数:29,代码来源:KProducer.scala


示例5: Module

//设置package包名称以及导入依赖的类
package com.github.dnvriend

import com.google.inject.{AbstractModule, Provides}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.springframework.kafka.core.{DefaultKafkaConsumerFactory, DefaultKafkaProducerFactory, KafkaTemplate}
import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.kafka.listener.{KafkaMessageListenerContainer, MessageListener}

import scala.collection.JavaConversions._

class Module extends AbstractModule {
  protected def configure(): Unit = {
  }

  @Provides
  def createProducerTemplate: KafkaTemplate[String, String] = {
    val senderProps: java.util.Map[String, Any] = Map(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ProducerConfig.RETRIES_CONFIG -> 0,
      ProducerConfig.BATCH_SIZE_CONFIG -> 16384,
      ProducerConfig.LINGER_MS_CONFIG -> 1,
      ProducerConfig.BUFFER_MEMORY_CONFIG -> 33554432,
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer],
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer]
    )
    val producerFactory = new DefaultKafkaProducerFactory[String, String](senderProps.mapValues(_.asInstanceOf[AnyRef]))
    new KafkaTemplate[String, String](producerFactory)
  }

  @Provides
  def createKafkaMessageListenerContainer(messageListener: MessageListener[String, String]): KafkaMessageListenerContainer[String, String] = {
    val consumerProps: java.util.Map[String, Any] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "group",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> true,
      ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "100",
      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "15000",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    val containerProperties = new ContainerProperties("test")
    containerProperties.setMessageListener(messageListener)

    val consumerFactory = new DefaultKafkaConsumerFactory[String, String](consumerProps.mapValues(_.asInstanceOf[AnyRef]))
    val container = new KafkaMessageListenerContainer[String, String](consumerFactory, containerProperties)
    container.setBeanName("testAuto")
    container.start()
    container
  }

  @Provides
  def messageListener: MessageListener[String, String] = new MessageListener[String, String] {
    override def onMessage(message: ConsumerRecord[String, String]): Unit = {
      println(s"received: $message")
    }
  }
} 
开发者ID:dnvriend,项目名称:spring-kafka-test,代码行数:61,代码来源:Module.scala


示例6: Producer

//设置package包名称以及导入依赖的类
package co.coinsmith.kafka.cryptocoin.producer

import java.util.Properties

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

object Producer {
  val conf = ConfigFactory.load
  val brokers = conf.getString("kafka.cryptocoin.bootstrap-servers")
  val schemaRegistryUrl = conf.getString("kafka.cryptocoin.schema-registry-url")

  val props = new Properties
  props.put("bootstrap.servers", brokers)
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("schema.registry.url", schemaRegistryUrl)
  val producer = new KafkaProducer[Object, Object](props)

  def send(topic: String, msg: Object) {
    val data = new ProducerRecord[Object, Object](topic, msg)
    producer.send(data)
  }
} 
开发者ID:blbradley,项目名称:kafka-cryptocoin,代码行数:25,代码来源:Producer.scala


示例7: KafkaProducerFactory

//设置package包名称以及导入依赖的类
package service

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}

object KafkaProducerFactory {
  def create(kafkaConnectionString: String) : KafkaProducer[String, String] = {
    val props = new java.util.Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionString)
    props.put(ProducerConfig.ACKS_CONFIG, "all")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    new KafkaProducer[String, String](props)
  }
} 
开发者ID:MarketWatcher,项目名称:data-ingestion-service,代码行数:16,代码来源:KafkaProducerFactory.scala


示例8: KafkaProducerWrapper

//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka

import java.util.Properties

import articlestreamer.shared.configuration.ConfigLoader
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.SslConfigs


class KafkaProducerWrapper(config: ConfigLoader, factory: KafkaFactory[String, String]) {

  private val producer = factory.getProducer(KafkaProducerWrapper.getProperties(config))

  def send(record: ProducerRecord[String, String]) = producer.send(record, new RecordCallback)

  def stopProducer() = {
    producer.close()
  }

}

object KafkaProducerWrapper {

  def getProperties(config: ConfigLoader): Properties = {

    import config._

    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
    properties.put(ProducerConfig.ACKS_CONFIG, "1")
    properties.put(ProducerConfig.RETRIES_CONFIG, 0.asInstanceOf[AnyRef])
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384.asInstanceOf[AnyRef])
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1.asInstanceOf[AnyRef])
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432.asInstanceOf[AnyRef])
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000.asInstanceOf[AnyRef])
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000.asInstanceOf[AnyRef])

    if (kafkaSSLMode) {
      properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
      properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, s"$kafkaTrustStore/truststore.jks")
      properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234")
      properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, s"$kafkaTrustStore/keystore.jks")
      properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234")
      properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234")
    }

    properties
  }

} 
开发者ID:firens,项目名称:article-streamer,代码行数:54,代码来源:KafkaProducerWrapper.scala


示例9: KafkaFactorySpec

//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka

import java.util.Properties

import articlestreamer.shared.BaseSpec
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}


class KafkaFactorySpec extends BaseSpec {

  val factory = new KafkaFactory[String, String]

  val serverProps = new Properties()
  serverProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
  serverProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  serverProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

  val consumerProps = new Properties()
  consumerProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:8080")
  consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

  "Factory" should "provide a producer" in {
    val producer = factory.getProducer(serverProps)
    producer.close()
    producer shouldBe a [KafkaProducer[_, _]]
  }

  "Factory" should "provide a consumer" in {
    val consumer = factory.getConsumer(consumerProps)
    consumer.close()
    consumer shouldBe a [KafkaConsumer[_, _]]
  }

} 
开发者ID:firens,项目名称:article-streamer,代码行数:38,代码来源:KafkaFactorySpec.scala


示例10: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "parsec.playground.landoop.com:49092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://parsec.playground.landoop.com:48081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:parsec-network,项目名称:parsec,代码行数:29,代码来源:KProducer.scala


示例11: KafkaClusterService

//设置package包名称以及导入依赖的类
package services

import java.util.Properties
import javax.inject.Inject

import crawlerConfig.CrawlerConfig
import crawlerglobal.Global
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}


class KafkaClusterService @Inject() () {
  private var producerOpt: Option[KafkaProducer[String, String]] = None

  initKafkaCluster()

  @inline
  private def getBootstrapServers(default: String = "localhost:9002"): String = {
    CrawlerConfig.getValue("Kafka.BootstrapServers", default)
  }

  @inline
  private def getAcksConfig(default: String = "0"): String = {
    CrawlerConfig.getValue("Kafka.Required.acks", default)
  }

  private def getProperties: Properties = {
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers())
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.StringSerializer])
    props.put(ProducerConfig.ACKS_CONFIG, getAcksConfig())
    props
  }

  private def initKafkaCluster() = {
    val producer = new KafkaProducer[String, String](getProperties)
    producerOpt = Some(producer)
    Global.KafkaClusterOpt = Some(this)
  }

  def send(topic: String, data: String) = {
    producerOpt match {
      case Some(producer) =>
        producer.send(new ProducerRecord(topic, data))
        true
      case None =>
        false
    }
  }

  def stop() = {
    producerOpt match {
      case Some(producer) => producer.close()
      case None =>
    }
  }
} 
开发者ID:TopSpoofer,项目名称:CodingCrawler,代码行数:58,代码来源:KafkaClusterService.scala


示例12: OutputManager

//设置package包名称以及导入依赖的类
package iomanager

import java.util

import com.typesafe.config.Config
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.Time

import scala.collection.JavaConversions._
import scala.collection.parallel.mutable.ParArray

object OutputManager {

  var producer: KafkaProducer[String, String] = null
  var predictionWindow = 0

  def prepareOutputStream(config: Config) = {

    predictionWindow = config.getInt("output.predictionWindow")*1000

    val brokers = config.getStringList("output.kafka.brokers").reduce(_ + "," + _)

    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    producer = new KafkaProducer[String, String](props)
  }

  def sendPredictions(predictions: (
    ParArray[(String, Double, String, String)],
      ParArray[(String, Double, String, String)]), time: Time) = {
    val simplePredictions =
      "{\"predictionStart\":"+time.milliseconds+
        ",\"predictionEnd\":"+(time.milliseconds+predictionWindow)+
      ",\"positive\":["+predictions._1.map(_._3).mkString(",")+
      "],\"negative\":["+predictions._2.map(_._3).mkString(",")+"]}"
    val advancedPredictions =
      "{\"predictionStart\":"+time.milliseconds+
        ",\"predictionEnd\":"+(time.milliseconds+predictionWindow)+
      ",\"positive\":["+predictions._1.map(_._4).mkString(",")+
      "],\"negative\":["+predictions._2.map(_._4).mkString(",")+"]}"

    val simpleMess =
      new ProducerRecord[String, String]("simple-predictions",simplePredictions)
    val advancedMess =
      new ProducerRecord[String, String]("advanced-predictions",advancedPredictions)

    producer.send(simpleMess)
    producer.send(advancedMess)
  }

} 
开发者ID:jandion,项目名称:SparkOFP,代码行数:57,代码来源:OutputManager.scala


示例13: KafkaWordCountProducer

//设置package包名称以及导入依赖的类
package com.jcode.spark.streaming

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._


object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
//      System.exit(1)
    }

//    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("192.168.1.234:9092","sparkStreamingTest","1","5")

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
// scalastyle:on println 
开发者ID:dreajay,项目名称:jcode-spark,代码行数:50,代码来源:KafkaWordCountProducer.scala


示例14: KafkaConfig

//设置package包名称以及导入依赖的类
package com.github.dnvriend

import java.util.Properties

import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.ProducerConfig

// see: http://docs.confluent.io/3.1.2/streams/developer-guide.html#overview
object KafkaConfig {
  def configAsMap = Map(
    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
    "schema.registry.url" -> "http://localhost:8081"
  )

  def config(): Properties = {
    import scala.collection.JavaConverters._
    val settings = new Properties
    settings.putAll(configAsMap.asJava)
    settings
  }
} 
开发者ID:dnvriend,项目名称:kafka-streams-test,代码行数:24,代码来源:KafkaConfig.scala


示例15: KafkaWSContext

//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws

import java.util.Properties

import com.landoop.kafka.ws.core.decoders.DecoderType
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.serialization.{ByteArraySerializer, IntegerSerializer, LongSerializer, StringSerializer}


case class KafkaWSContext(config: KafkaWSConfig) {
  require(config != null, "Null instance for config parameter")

  val KafkaProducer: KafkaProducer[String, String] = {
    val props = new Properties()
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
    new KafkaProducer[String, String](props)
  }

  def getProducer[K, V](keyDecoder: DecoderType, valueDecoder: DecoderType): KafkaProducer[K, V] = {
    val props = new Properties()
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getSerializer(keyDecoder).getCanonicalName)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer(valueDecoder).getCanonicalName)
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
    props.put("schema.registry.url", config.schemaRegistryUrl)
    new KafkaProducer[K, V](props)
  }

  def getSerializer(decoderType: DecoderType): Class[_] = {
    decoderType match {
      case DecoderType.AVRO => classOf[KafkaAvroSerializer]
      case DecoderType.BINARY => classOf[ByteArraySerializer]
      case DecoderType.LONG => classOf[LongSerializer]
      case DecoderType.INT => classOf[IntegerSerializer]
      case DecoderType.STRING | DecoderType.JSON => classOf[StringSerializer]
      case other => throw new IllegalArgumentException(s"Decoder type '$other' is not recognized.")
    }
  }
} 
开发者ID:Landoop,项目名称:kafka-ws,代码行数:42,代码来源:KafkaWSContext.scala



注:本文中的org.apache.kafka.clients.producer.ProducerConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala KryoSerializer类代码示例发布时间:2022-05-23
下一篇:
Scala ProducerConfig类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap