• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala KafkaConfig类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.server.KafkaConfig的典型用法代码示例。如果您正苦于以下问题:Scala KafkaConfig类的具体用法?Scala KafkaConfig怎么用?Scala KafkaConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了KafkaConfig类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ClientProducerRequest

//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.common.cache

import java.util.Properties

import kafka.server.KafkaConfig
import org.apache.kafka.clients.ManualMetadataUpdater
import org.apache.kafka.common.metrics.Metrics

private[cache] trait CacheEntry {
}

case class ClientProducerRequest(clientId: String,
                                 brokerList: String,
                                 acks: Short)
                                (val props: Properties = new Properties) extends CacheEntry

case class NetworkClientRequest(clientId: String)
                               (val metadataUpdater: ManualMetadataUpdater,
                                val config: KafkaConfig,
                                val metrics: Metrics) extends CacheEntry 
开发者ID:mmolimar,项目名称:vkitm,代码行数:21,代码来源:CacheEntry.scala


示例2: EmbeddedVKitM

//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.embedded

import java.util.Properties

import com.github.mmolimar.vkitm.server.{VKitMConfig, VKitMServer}
import com.github.mmolimar.vkitm.utils.TestUtils
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients.producer.ProducerConfig

class EmbeddedVKitM(zkConnection: String,
                    brokerList: String,
                    port: Int = TestUtils.getAvailablePort) extends Logging {

  private var vkitmServer: VKitMServer = null

  def startup() {
    info("Starting up VKitM server")

    val serverProps = new Properties
    serverProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    serverProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    serverProps.setProperty(KafkaConfig.PortProp, port.toString)
    serverProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + port)

    val producerProps = new Properties
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)

    val brokerPort = brokerList.split(":")(1)
    val consumerProps = new Properties
    consumerProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    consumerProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    consumerProps.setProperty(KafkaConfig.PortProp, brokerPort)
    consumerProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + brokerPort)

    vkitmServer = new VKitMServer(VKitMConfig.fromProps(serverProps, producerProps, consumerProps))
    vkitmServer.startup()

    info("Started embedded VKitM server")
  }

  def shutdown() {
    vkitmServer.shutdown()
  }

  def getPort: Int = port

  def getBrokerList: String = "localhost:" + getPort

  def getServer: VKitMServer = vkitmServer

  override def toString: String = {
    val sb: StringBuilder = StringBuilder.newBuilder
    sb.append("VKitM{")
    sb.append("config='").append(vkitmServer.config).append('\'')
    sb.append('}')

    sb.toString
  }

} 
开发者ID:mmolimar,项目名称:vkitm,代码行数:62,代码来源:EmbeddedVKitM.scala


示例3: send

//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.core.kafka

import java.util.Properties

import it.agilelab.bigdata.wasp.core.WaspEvent
import it.agilelab.bigdata.wasp.core.WaspEvent.WaspMessageEnvelope
import it.agilelab.bigdata.wasp.core.models.configuration.{TinyKafkaConfig, KafkaConfigModel}
import kafka.producer.{DefaultPartitioner, KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.KafkaConfig


  def send(topic: String, key: K, message: V): Unit =
    batchSend(topic, key, Seq(message))

  def batchSend(topic: String, key: K, batch: Seq[V]): Unit = {
    val messages = batch map (msg => new KeyedMessage[K, V](topic, key, msg))
    producer.send(messages.toArray: _*)
  }

  def close(): Unit = producer.close()

}


object WaspKafkaWriter {

  def createConfig(brokers: Set[String], batchSize: Int, producerType: String, serializerFqcn: String, keySerializerFqcn: String, partitionerFqcn: String): ProducerConfig = {
    val props = new Properties()
    props.put("metadata.broker.list", brokers.mkString(","))
    props.put("serializer.class", serializerFqcn)
    props.put("key.serializer.class", keySerializerFqcn)
    props.put("partitioner.class", partitionerFqcn)
    props.put("producer.type", producerType)
    props.put("request.required.acks", "1")
    props.put("batch.num.messages", batchSize.toString)
    new ProducerConfig(props)
  }

  def defaultConfig(config: KafkaConfig): ProducerConfig =
    createConfig(Set(s"${config.hostName}:${config.port}"), 100, "async", classOf[StringEncoder].getName, classOf[StringEncoder].getName, classOf[DefaultPartitioner].getName)
} 
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:43,代码来源:WaspKafkaWriter.scala


示例4: TestingKafkaCluster

//设置package包名称以及导入依赖的类
package com.saikocat.test

import java.io.IOException
import java.util.Properties

import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import kafka.utils.TestUtils

import org.apache.curator.test.TestingServer

class TestingKafkaCluster(val kafkaServer: KafkaServerStartable,
                          val zkServer: TestingServer) {

  def start(): Unit = {
    kafkaServer.startup()
  }

  def kafkaConfig(): KafkaConfig = {
    kafkaServer.serverConfig
  }

  def kafkaBrokerString(): String = {
    s"localhost:${kafkaServer.serverConfig.port}"
  }

  def zkConnectString(): String = {
    return zkServer.getConnectString()
  }

  def kafkaPort(): Int = {
    return kafkaServer.serverConfig.port
  }

  @throws(classOf[IOException])
  def stop(): Unit = {
    kafkaServer.shutdown()
    zkServer.stop()
  }
}

object TestingKafkaCluster {
  @throws(classOf[Exception])
  def apply(): TestingKafkaCluster = {
    val zkServer = new TestingServer()
    val config: KafkaConfig = getKafkaConfig(zkServer.getConnectString())
    val kafkaServer = new KafkaServerStartable(config)
    new TestingKafkaCluster(kafkaServer, zkServer)
  }

  def getKafkaConfig(zkConnectString: String): KafkaConfig = {
    val propsI: scala.collection.Iterator[Properties] =
      TestUtils.createBrokerConfigs(1).iterator
    assert(propsI.hasNext)
    val props: Properties = propsI.next()
    assert(props.containsKey("zookeeper.connect"))
    props.put("zookeeper.connect", zkConnectString)
    props.put("host.name", "localhost")
    new KafkaConfig(props)
  }
} 
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:62,代码来源:TestHelper.scala


示例5: EmbeddedKafka

//设置package包名称以及导入依赖的类
package nl.bigdatarepublic.streaming.embedded.adapter.kafka

import java.util.Properties

import com.typesafe.scalalogging.LazyLogging
import kafka.server.{KafkaConfig, KafkaServerStartable}
import nl.bigdatarepublic.streaming.embedded.entity.EmbeddedService

import scala.collection.JavaConverters._
import scala.reflect.io.Path
import scala.util.{Failure, Success, Try}


class EmbeddedKafka(props: Map[String, String], clearState: Boolean) extends LazyLogging with EmbeddedService {

  val kafka: KafkaServerStartable = new KafkaServerStartable(KafkaConfig(props.asJava))

  def start(): Unit = {
    // Clear out the existing kafka dir upon startup.
    if (clearState) {
      logger.info("Cleaning Kafka data dir before start...")
      kafka.serverConfig.logDirs.foreach { x =>
        Try(Path(x).deleteRecursively()) match {
          case Success(true) => logger.info("Successfully cleaned Kafka data dir...")
          case Success(false) => logger.info("Failed to clean Kafka data dir...")
          case Failure(e) => logger.warn("Failed to clean Kafka data dir", e)
        }
      }
    }
    logger.info("Starting embedded Kafka...")
    kafka.startup()
    logger.info("Successfully started embedded Kafka")

  }

  def stop(): Unit = {
    logger.info("Stopping embedded Kafka...")
    kafka.shutdown()
    logger.info("Successfully stopped embedded Kafka")

  }

}



object EmbeddedKafka {
  def apply(props: Map[String, String], clearState: Boolean): EmbeddedKafka = new EmbeddedKafka(props, clearState)
  def apply(props: Map[String, String]): EmbeddedKafka = new EmbeddedKafka(props, false)

  // Java compatibility
  def apply(props: Properties, clearState: Boolean): EmbeddedKafka = new EmbeddedKafka(props.asScala.toMap, clearState)
  def apply(props: Properties): EmbeddedKafka = new EmbeddedKafka(props.asScala.toMap, false)
} 
开发者ID:BigDataRepublic,项目名称:bdr-engineering-stack,代码行数:55,代码来源:EmbeddedKafka.scala



注:本文中的kafka.server.KafkaConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Cluster类代码示例发布时间:2022-05-23
下一篇:
Scala MatrixFactorizationModel类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap