• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Properties类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.Properties的典型用法代码示例。如果您正苦于以下问题:Scala Properties类的具体用法?Scala Properties怎么用?Scala Properties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Properties类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Access

//设置package包名称以及导入依赖的类
package hu.blackbelt.cd.bintray.deploy

import java.nio.file.{Files, StandardCopyOption}
import java.util.{Properties, UUID}

import awscala.s3.S3
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.GetObjectRequest
import hu.blackbelt.cd.bintray.VFS.FS

object Access {
  val bintray_organization = "bintray.organization"
  val bintray_user = "bintray.user"
  val bintray_apikey = "bintray.apikey"
  val aws_accessKeyId = "aws.accessKeyId"
  val aws_secretKey = "aws.secretKey"


  def collect = {
    implicit val s3 = S3()(com.amazonaws.regions.Region.getRegion(Regions.EU_CENTRAL_1))



    val destination = FS.getPath(s"/tmp/${UUID.randomUUID().toString}")
    Files.createDirectories(destination)
    val s3Object = s3.getObject(new GetObjectRequest("blackbelt-secrets", "bintray-deploy/access.properties"))
    Files.copy(s3Object.getObjectContent, destination, StandardCopyOption.REPLACE_EXISTING)

    import scala.collection.JavaConverters._
    val prop = new Properties()
    prop.load(Files.newInputStream(destination))
    prop.entrySet().asScala.foreach {
      (entry) => {
        sys.props += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
      }
    }
  }
} 
开发者ID:tsechov,项目名称:s3-bintray-deploy,代码行数:39,代码来源:Access.scala


示例2: HelloWorldSpring

//设置package包名称以及导入依赖的类
package helloworld

import java.io.FileInputStream
import java.util.Properties

import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.support.{DefaultListableBeanFactory, PropertiesBeanDefinitionReader}

object HelloWorldSpring extends App {
  @throws(classOf[Exception])
  val factory: BeanFactory = getBeanFactory
  val mr: MessageRenderer = factory.getBean("renderer").asInstanceOf[MessageRenderer]
  val mp: MessageProvider = factory.getBean("provider").asInstanceOf[MessageProvider]
  mr.setMessageProvider(mp)
  mr.render

  @throws(classOf[Exception])
  private def getBeanFactory: BeanFactory = {
    val factory: DefaultListableBeanFactory = new DefaultListableBeanFactory
    val rdr: PropertiesBeanDefinitionReader = new PropertiesBeanDefinitionReader(factory)
    val props: Properties = new Properties
    props.load(new FileInputStream("springdi_scala/src/helloworld/beans.properties"))
    rdr.registerBeanDefinitions(props)
    return factory
  }
} 
开发者ID:BBK-PiJ-2015-67,项目名称:sdp-portfolio,代码行数:27,代码来源:HelloWorldSpring.scala


示例3: SimpleKafkaProducer

//设置package包名称以及导入依赖的类
package com.example

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.json4s.{DefaultFormats, jackson}

class SimpleKafkaProducer(kafkaSocket: Socket, topic: String, brokers: Int = 1) {

  private val serializer = "org.apache.kafka.common.serialization.StringSerializer"

  private def configuration = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaSocket.toString())
    props.put("key.serializer", serializer)
    props.put("value.serializer", serializer)
    props
  }

  def send[T <: AnyRef](message: T) = {
    implicit val serialization = jackson.Serialization
    implicit val formats = DefaultFormats

    val producer = new KafkaProducer[String, String](configuration)
    val jsonMessage = serialization.write[T](message)
    val data = new ProducerRecord[String, String](topic, jsonMessage)

    producer.send(data)
    producer.close()
  }
} 
开发者ID:frossi85,项目名称:financial-statistics-crawler,代码行数:31,代码来源:SimpleKafkaProducer.scala


示例4: Mailer

//设置package包名称以及导入依赖的类
package org.kirhgoff.lastobot

import java.util.Properties
import javax.mail.{Message, Session}
import javax.mail.internet.{InternetAddress, MimeMessage}

import scala.io.Source


object Mailer {
  val host = "smtp.gmail.com"
  val port = "587"

  val address = "[email protected]"
  val username = "lastobot"
  val password = Source.fromFile(System.getProperty("user.home")
    + "/.lastobot/.mail").getLines.mkString

  def sendMail(text:String, subject:String) = {
    val properties = new Properties()
    properties.put("mail.smtp.port", port)
    properties.put("mail.smtp.auth", "true")
    properties.put("mail.smtp.starttls.enable", "true")

    val session = Session.getDefaultInstance(properties, null)
    val message = new MimeMessage(session)
    message.addRecipient(Message.RecipientType.TO, new InternetAddress(address));
    message.setSubject(subject)
    message.setContent(text, "text/html")

    val transport = session.getTransport("smtp")
    transport.connect(host, username, password)
    transport.sendMessage(message, message.getAllRecipients)
  }

  def main(args:Array[String]) = {
    sendMail("aaaa", "bbb")
  }
} 
开发者ID:kirhgoff,项目名称:lastobot,代码行数:40,代码来源:Mailer.scala


示例5: ReadyKafkaProducer

//设置package包名称以及导入依赖的类
package com.bencassedy.readykafka.producer

import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}


class ReadyKafkaProducer {
  case class KafkaProducerConfigs(brokerList: String = "127.0.0.1:9092") {
    val properties = new Properties()
    properties.put("bootstrap.servers", brokerList)
    properties.put("key.serializer", classOf[StringSerializer])
    properties.put("value.serializer", classOf[StringSerializer])
//    properties.put("serializer.class", classOf[StringDeserializer])
//    properties.put("batch.size", 16384)
//    properties.put("linger.ms", 1)
//    properties.put("buffer.memory", 33554432)
  }

  val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)

  def produce(topic: String, messages: Iterable[String]): Unit = {
    messages.foreach { m =>
      producer.send(new ProducerRecord[String, String](topic, m))
    }
    producer.close(100L, TimeUnit.MILLISECONDS)
  }
} 
开发者ID:bencassedy,项目名称:ready-kafka,代码行数:31,代码来源:ReadyKafkaProducer.scala


示例6: fillFromEnv

//设置package包名称以及导入依赖的类
package hu.blackbelt.cd.bintray.deploy

import java.io.{File, FileInputStream}
import java.util.Properties

import org.scalatest.{BeforeAndAfter, Suite}

trait Creds extends BeforeAndAfter {
  this: Suite =>

  def fillFromEnv(prop: Properties) = {
    def put(key: String) = sys.env.get(key.replace('.','_')).map(prop.put(key.replace('_','.'), _))
    put(Access.aws_accessKeyId)
    put(Access.aws_secretKey)
    put(Access.bintray_organization)
    put(Access.bintray_user)
    put(Access.bintray_apikey)
  }

  before {
    import scala.collection.JavaConverters._
    val prop = new Properties()
    val propsFile = new File("env.properties")
    if (propsFile.exists()) {
      prop.load(new FileInputStream(propsFile))
    } else {
      fillFromEnv(prop)
    }
    prop.entrySet().asScala.foreach {
      (entry) => {
        sys.props += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
      }
    }
  }


} 
开发者ID:tsechov,项目名称:s3-bintray-deploy,代码行数:38,代码来源:Creds.scala


示例7: Generator

//设置package包名称以及导入依赖的类
package data.processing.kafkagenerator

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.github.andr83.scalaconfig._
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import data.processing.avro.AvroEncoder

import scala.concurrent.forkjoin.ThreadLocalRandom


object Generator {

  val metricsRegistry = new MetricsRegistry

  val config = ConfigFactory.load()
  val props = config.getConfig("kafka-client").as[Properties]
  val topic = config.getString("kafka-client.topic")
  val numberOfUsers = config.getInt("generator.number.of.users")
  val urls = config.getStringList("generator.urls")
  val eventTypes = config.getStringList("generator.event.types")
  val throughput = config.getInt("generator.throughput")

  val avroEncoder = new AvroEncoder("/event-record.json")

  def generateEvent() = {
    val id = ThreadLocalRandom.current().nextLong()
    val ts = java.lang.System.currentTimeMillis()
    val userId = ThreadLocalRandom.current().nextInt(numberOfUsers).toHexString
    val url = urls.get(ThreadLocalRandom.current().nextInt(urls.size()))
    val eventType = eventTypes.get(ThreadLocalRandom.current().nextInt(eventTypes.size()))

    (id, avroEncoder.encode((id, ts, userId, url, eventType)))
  }

  def main(args: Array[String]): Unit = {
    val meter = metricsRegistry.newMeter(new MetricName("", "", ""), "", TimeUnit.SECONDS)
    val producer = new KafkaProducer[String, Array[Byte]](props)
    while(true) {
      if (meter.meanRate < throughput) {
        meter.mark()
        val event = generateEvent()
        producer.send(new ProducerRecord[String, Array[Byte]](topic, event._1.toString, event._2))
      }
      else {
        Thread.sleep(1)
      }
    }
    producer.flush()
    producer.close()
  }
} 
开发者ID:ipogudin,项目名称:data-processing-examples,代码行数:56,代码来源:Generator.scala


示例8: fromData

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.pdx

import java.util.Properties

import org.apache.geode.cache.Declarable
import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter}


  override def fromData(clazz: Class[_], in: PdxReader): AnyRef =
    serializers
      .get(clazz)
      .map(_.fromData(clazz, in))
      .orElse(serializers.collectFirst {
        case (c, ser) if isPdxCompat(c, clazz) =>
          val v = ser.fromData(clazz, in)
          if (v != null) register(ser, clazz)
          v
      })
      .orNull

  override def init(props: Properties): Unit = {}
} 
开发者ID:akka,项目名称:alpakka,代码行数:23,代码来源:DelegatingPdxSerializer.scala


示例9: ClickhouseConnectionFactory

//设置package包名称以及导入依赖的类
package io.clickhouse.ext

import java.util.Properties
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties

object ClickhouseConnectionFactory extends Serializable{

  private val dataSources = scala.collection.mutable.Map[(String, Int), ClickHouseDataSource]()

  def get(host: String, port: Int = 8123): ClickHouseDataSource ={
    dataSources.get((host, port)) match {
      case Some(ds) =>
        ds
      case None =>
        val ds = createDatasource(host, port = port)
        dataSources += ((host, port) -> ds)
        ds
    }
  }

  private def createDatasource(host: String, dbO: Option[String] = None, port: Int = 8123) = {
    val props = new Properties()
    dbO map {db => props.setProperty("database", db)}

    val clickHouseProps = new ClickHouseProperties(props)
    new ClickHouseDataSource(s"jdbc:clickhouse://$host:$port", clickHouseProps)
  }
} 
开发者ID:DmitryBe,项目名称:spark-clickhouse,代码行数:30,代码来源:ClickhouseConnectionFactory.scala


示例10: DataDriver

//设置package包名称以及导入依赖的类
package org.hpi.esb.datasender

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.hpi.esb.commons.config.Configs
import org.hpi.esb.commons.util.Logging
import org.hpi.esb.datasender.config._
import org.hpi.esb.datasender.output.writers.DatasenderRunResultWriter

import scala.io.Source

class DataDriver() extends Logging {

  private val topics = Configs.benchmarkConfig.sourceTopics
  private val config = ConfigHandler.config
  private val dataReader = createDataReader(config.dataReaderConfig)
  private val kafkaProducerProperties = createKafkaProducerProperties(config.kafkaProducerConfig)
  private val kafkaProducer = new KafkaProducer[String, String](kafkaProducerProperties)
  private val resultHandler = new DatasenderRunResultWriter(config, Configs.benchmarkConfig, kafkaProducer)
  private val dataProducer = createDataProducer(kafkaProducer, dataReader, resultHandler)

  def run(): Unit = {
    dataProducer.execute()
  }

  def createKafkaProducerProperties(kafkaProducerConfig: KafkaProducerConfig): Properties = {

    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfig.bootstrapServers.get)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.keySerializerClass.get)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.valueSerializerClass.get)
    props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfig.acks.get)
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfig.batchSize.get.toString)
    props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfig.lingerTime.toString)
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerConfig.bufferMemorySize.toString)
    props
  }

  def createDataReader(dataReaderConfig: DataReaderConfig): DataReader = {
    new DataReader(Source.fromFile(dataReaderConfig.dataInputPath.get),
      dataReaderConfig.columns.get,
      dataReaderConfig.columnDelimiter.get,
      dataReaderConfig.dataColumnStart.get,
      dataReaderConfig.readInRam)
  }

  def createDataProducer(kafkaProducer: KafkaProducer[String, String], dataReader: DataReader,
                         resultHandler: DatasenderRunResultWriter): DataProducer = {

    val numberOfThreads = config.dataSenderConfig.numberOfThreads.get
    val sendingInterval = Configs.benchmarkConfig.sendingInterval
    val sendingIntervalTimeUnit = Configs.benchmarkConfig.getSendingIntervalTimeUnit()
    val duration = Configs.benchmarkConfig.duration
    val durationTimeUnit = Configs.benchmarkConfig.getDurationTimeUnit()
    val singleColumnMode = config.dataSenderConfig.singleColumnMode

    new DataProducer(resultHandler, kafkaProducer, dataReader, topics, numberOfThreads,
      sendingInterval, sendingIntervalTimeUnit, duration, durationTimeUnit, singleColumnMode)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:62,代码来源:DataDriver.scala


示例11: EmailParser

//设置package包名称以及导入依赖的类
package uk.pkerrigan.dmarcparser

import java.io.ByteArrayInputStream
import java.nio.charset.CodingErrorAction
import java.util.Properties
import java.util.zip.{GZIPInputStream, ZipInputStream}
import javax.activation.DataSource
import javax.mail.Session
import javax.mail.internet.MimeMessage
import scala.collection.JavaConverters._

import org.apache.commons.mail.util.MimeMessageParser
import uk.pkerrigan.dmarcparser.report.Feedback

import scala.io._

class EmailParser(parser: ParserTrait = new Parser()) extends EmailParserTrait{

  implicit val codec = Codec("UTF-8")
  codec.onMalformedInput(CodingErrorAction.REPLACE)
  codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

  def parseEmail(email: String): Option[Feedback] = {
    val s = Session.getDefaultInstance(new Properties())
    val is = new ByteArrayInputStream(email.getBytes)
    val message = new MimeMessage(s, is)
    val messageParser = new MimeMessageParser(message).parse()

    messageParser.getAttachmentList.asScala.headOption.flatMap(extract)
  }

  private def extract(a: DataSource): Option[Feedback] = a match {
    case `a` if a.getContentType.equals("application/gzip") => extractGzip(a)
    case `a` if a.getContentType.equals("application/x-gzip") => extractGzip(a)
    case `a` if a.getContentType.equals("application/zip") => extractZip(a)
    case `a` if a.getContentType.equals("application/x-zip-compressed") => extractZip(a)
    case _ => None
  }

  private def extractZip(a: DataSource): Option[Feedback] = {
    val zip = new ZipInputStream(a.getInputStream)
    zip.getNextEntry
    val rawXml = Source.fromInputStream(zip).mkString
    if (rawXml == "") None else Some(parser.parse(rawXml))
  }
  private def extractGzip(a: DataSource): Option[Feedback] = {
    val zip = new GZIPInputStream(a.getInputStream)
    val rawXml = Source.fromInputStream(zip).mkString
    if (rawXml == "") None else Some(parser.parse(rawXml))
  }
} 
开发者ID:patrickkerrigan,项目名称:dmarc-parser,代码行数:52,代码来源:EmailParser.scala


示例12:

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.productservice

import java.util.Properties


package object kafka {

  val orderConsumerCfg: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("group.id", "product.consumers")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  val confirmationProducerCfg: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("acks", "all")
    props.put("retries", "0")
    props.put("batch.size", "16384")
    props.put("linger.ms", "1")
    props.put("buffer.memory", "33554432")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:33,代码来源:package.scala


示例13:

//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice

import java.util.Properties


package object kafka {

  val orderConsumerCfg: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("group.id", "order.consumers")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  val storeConfirmationConsumer: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("group.id", "order.consumers")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  val producerCfg: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("acks", "all")
    props.put("retries", "0")
    props.put("batch.size", "16384")
    props.put("linger.ms", "1")
    props.put("buffer.memory", "33554432")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props
  }
} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:45,代码来源:package.scala


示例14:

//设置package包名称以及导入依赖的类
package com.github.eventdrivenorders.api

import java.util.Properties


package object kafka {

  val producerCfg: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("acks", "all")
    props.put("retries", "0")
    props.put("batch.size", "16384")
    props.put("linger.ms", "1")
    props.put("buffer.memory", "33554432")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props
  }

  val orderStatusConsumer: Properties = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("group.id", "api.consumers")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

} 
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:34,代码来源:package.scala


示例15: Helpers

//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.utils

import java.util.Properties
import java.util.concurrent.{CancellationException, TimeUnit, Future => JFuture}

import com.typesafe.config.Config
import org.jboss.netty.util.{HashedWheelTimer, Timeout, TimerTask}

import scala.concurrent.{Future, Promise}
import scala.util.Try

object Helpers {

  private val pollIntervalMs = 50L
  private val timer = new HashedWheelTimer(pollIntervalMs, TimeUnit.MILLISECONDS)

  implicit class JFutureHelpers[T](jf: JFuture[T]) {
    def asScala: Future[T] = {
      val promise = Promise[T]()

      def checkCompletion(): Unit = {
        if (jf.isCancelled) {
          promise.failure(new CancellationException())
        } else if (jf.isDone) {
          promise.complete(Try(jf.get))
        } else {
          scheduleTimeout()
        }
        ()
      }

      def scheduleTimeout(): Unit = {
        timer.newTimeout(new TimerTask {
          override def run(timeout: Timeout): Unit = checkCompletion()
        }, pollIntervalMs, TimeUnit.MILLISECONDS)
        ()
      }

      checkCompletion()
      promise.future
    }
  }

  implicit def propsFromConfig(config: Config): Properties = {
    import scala.collection.JavaConversions._

    val props = new Properties()

    val map: Map[String, Object] = config.entrySet().map({ entry =>
      entry.getKey -> entry.getValue.unwrapped()
    })(collection.breakOut)

    props.putAll(map)
    props
  }

} 
开发者ID:mmolimar,项目名称:vkitm,代码行数:58,代码来源:Helpers.scala


示例16: EmbeddedVKitM

//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.embedded

import java.util.Properties

import com.github.mmolimar.vkitm.server.{VKitMConfig, VKitMServer}
import com.github.mmolimar.vkitm.utils.TestUtils
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients.producer.ProducerConfig

class EmbeddedVKitM(zkConnection: String,
                    brokerList: String,
                    port: Int = TestUtils.getAvailablePort) extends Logging {

  private var vkitmServer: VKitMServer = null

  def startup() {
    info("Starting up VKitM server")

    val serverProps = new Properties
    serverProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    serverProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    serverProps.setProperty(KafkaConfig.PortProp, port.toString)
    serverProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + port)

    val producerProps = new Properties
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)

    val brokerPort = brokerList.split(":")(1)
    val consumerProps = new Properties
    consumerProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
    consumerProps.setProperty(KafkaConfig.HostNameProp, "localhost")
    consumerProps.setProperty(KafkaConfig.PortProp, brokerPort)
    consumerProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + brokerPort)

    vkitmServer = new VKitMServer(VKitMConfig.fromProps(serverProps, producerProps, consumerProps))
    vkitmServer.startup()

    info("Started embedded VKitM server")
  }

  def shutdown() {
    vkitmServer.shutdown()
  }

  def getPort: Int = port

  def getBrokerList: String = "localhost:" + getPort

  def getServer: VKitMServer = vkitmServer

  override def toString: String = {
    val sb: StringBuilder = StringBuilder.newBuilder
    sb.append("VKitM{")
    sb.append("config='").append(vkitmServer.config).append('\'')
    sb.append('}')

    sb.toString
  }

} 
开发者ID:mmolimar,项目名称:vkitm,代码行数:62,代码来源:EmbeddedVKitM.scala


示例17: SimpleKafkaConsumer

//设置package包名称以及导入依赖的类
package com.example

import java.nio.charset.StandardCharsets
import java.util.Properties
import kafka.consumer.ConsumerConfig
import org.json4s.{DefaultFormats, jackson}
import scala.collection.immutable.HashMap

class SimpleKafkaConsumer(kafkaSocket: Socket, zooKeeperSocket: Socket, groupId: String, topic: String) {

  private def configuration = {
    val deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
    val props = new Properties()
    props.put("bootstrap.servers", kafkaSocket.toString())
    props.put("key.deserializer", deserializer)
    props.put("value.deserializer", deserializer)
    props.put("group.id", groupId)
    props.put("consumer.id", "consumer0")
    props.put("consumer.timeout", "-1")
    props.put("auto.offset.reset", "smallest")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("zookeeper.session.timeout.ms", "6000")
    props.put("zookeeper.connect", zooKeeperSocket.toString())
    props.put("num.consumer.fetchers", "2")
    props.put("rebalance.max.retries", "4")
    props.put("auto.commit.interval.ms", "1000")
    props
  }

  private val consumer = kafka.consumer.Consumer.create(new ConsumerConfig(configuration))

  def read[T <: AnyRef]()(implicit m: Manifest[T]): Iterable[T] = {
    implicit val serialization = jackson.Serialization
    implicit val formats = DefaultFormats

    val topicCountMap = HashMap(topic -> 1)
    val consumerMap = consumer.createMessageStreams(topicCountMap)
    val stream = consumerMap.get(topic).get(0)
    val iterator = stream.iterator()

    iterator.map(x => serialization.read[T](new String(x.message(), StandardCharsets.UTF_8))).toStream
  }

  def shutdown() = {
    consumer.shutdown()
  }
} 
开发者ID:frossi85,项目名称:financial-statistics-collector,代码行数:48,代码来源:SimpleKafkaConsumer.scala


示例18: EmbeddedKafka

//设置package包名称以及导入依赖的类
package com.softwaremill.embeddedkafka

import java.io.IOException
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.Properties

import akka.actor.Actor
import utils.embeddedkafka.KafkaLocal

class EmbeddedKafka extends Actor {

	var embeddedKafka: Option[KafkaLocal] = None

	override def preStart(): Unit = {
		super.preStart()
		deleteKafkaData()
		embeddedKafka = Some(initEmbeddedKafka())
		context.parent ! "Start"
	}

	override def postStop(): Unit = {
		embeddedKafka.foreach(_.stop())
		super.postStop()
	}

	def initEmbeddedKafka() = {
		val kafkaProperties = new Properties()
		val zkProperties = new Properties()
		kafkaProperties.load(getClass.getResourceAsStream("/kafkalocal.properties"))
		zkProperties.load(getClass.getResourceAsStream("/zklocal.properties"))
		new KafkaLocal(kafkaProperties, zkProperties)
	}

	override def receive: Actor.Receive = {
		case _ =>
	}

	def deleteKafkaData(): Unit = {
		val path = Paths.get("./data")
		Files.walkFileTree(path, new FileVisitor[Path] {
			override def visitFileFailed(file: Path, exc: IOException) = FileVisitResult.CONTINUE

			override def visitFile(file: Path, attrs: BasicFileAttributes) = {
				Files.delete(file)
				FileVisitResult.CONTINUE
			}

			override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = FileVisitResult.CONTINUE

			override def postVisitDirectory(dir: Path, exc: IOException) = {
				Files.delete(dir)
				FileVisitResult.CONTINUE
			}
		})
	}
} 
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:58,代码来源:EmbeddedKafka.scala


示例19: KafkaUtilities

//设置package包名称以及导入依赖的类
package com.fortysevendeg.log.utils

import java.util.Properties

import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkConnection
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaUtilities {

  def createKafkaProducer(): Producer[String, String] = {
    val props = new Properties()
    props.put("metadata.broker.list", "localhost:9092")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
//    props.put("partitioner.class", "com.fortysevendeg.biglog.SimplePartitioner")
//    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("producer.type", "async")
    props.put("request.required.acks", "1")

    val config = new ProducerConfig(props)
    new Producer[String, String](config)
  }

  def createKafkaConsumer(): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    new KafkaConsumer[String, String](props)
  }

  def createTopicIntoKafka(topic: String, numPartitions: Int, replicationFactor: Int): Unit = {
    val zookeeperConnect = "localhost:2181"
    val sessionTimeoutMs = 10 * 1000
    val connectionTimeoutMs = 8 * 1000

    val zkClient = ZkUtils.createZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs)
    val zkUtils = new ZkUtils(zkClient, zkConnection = new ZkConnection(zookeeperConnect), isSecure = false)
    AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, new Properties)
    zkClient.close()
  }

  def d(kafkaProducer: Producer[String, String], topic: String, message: String) = {
    kafkaProducer.send(new KeyedMessage[String, String](topic, message))
  }

} 
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:52,代码来源:KafkaUtilities.scala


示例20: AppConfigs

//设置package包名称以及导入依赖的类
package com.groupon.dse.configs

import java.util.Properties

import org.apache.spark.storage.StorageLevel

object AppConfigs {
  val SparkReceivers = ("spark.num.receivers", "1")
  val SparkStorageLevel = ("spark.storage.level", "MEMORY_AND_DISK_SER_2")

  val Topics = ("topics", "")
  val TopicsBlackList = ("topics.blacklist", "")
  val TopicsEnableBlockingConsumption = ("topic.consumption.blocking", "false")
  val TopicConsumptionPolicy = ("topic.consumption.policy", "OFFSET")
  val TopicConsumptionOffsetThreshold = ("topic.consumption.offset.threshold", "0")
  val TopicConsumptionTimeThresholdMs = ("topic.consumption.time.threshold.ms", "1000")
  val TopicFetchSizeBytes = ("topic.fetch.size.bytes", "1048576")
  val TopicRepartitionFactor = ("topic.repartition.factor", "1")
  val TopicStartOffset = ("topic.start.offset", "-1") //-1: Max, -2: Min, Other: Actual offset value

  val PartitionRefreshIntervalMs = ("partition.refresh.interval.ms", "30000")
  val PartitionWarmUpRefreshIntervalMs = ("partition.warmup.refresh.interval.ms", "10000")
  val ReceiverRestIntervalOnFailMs = ("receiver.rest.interval.fail.ms", "2500")
  val ReceiverRestIntervalOnSuccessMs = ("receiver.rest.interval.success.ms", "100")

  val KafkaBrokerConnect = ("kafka.broker.zk.connect", "")
  val KafkaSocketTimeoutMs = ("kafka.socket.timeout", "10000")
  val KafkaSocketBufferSizeBytes = ("kafka.socket.buffer.size", "1048576")
  val KafkaZkSessionTimeoutMs = ("kafka.zk.session.timeout.ms", "10000")
  val KafkaZkConnectionTimeoutMs = ("kafka.zk.connection.timeout.ms", "10000")

  val StateControllerType = ("statecontroller.type", "MEMORY")
  val ZookeeperStateControllerConnect = ("statecontroller.zk.connect", "")
  val ZookeeperStateControllerRoot = ("statecontroller.zk.root", "/baryon")
  val ZookeeperStateControllerConnTimeoutMs = ("statecontroller.zk.conn.timeout.ms", "120000")
  val ZookeeperStateControllerSessionTimeoutMs = ("statecontroller.zk.session.timeout.ms", "60000")

  val TopicFetcherType = ("topics.fetcher.type", "LOCAL")
  val HDFSTopicSource = ("topics.fetcher.hdfs.source", "")
  val HTTPTopicSource = ("topics.fetcher.http.source", "")

  
  def validatedBooleanConfig(
                              properties: Properties,
                              propertyName: String,
                              propertyDefault: String)
  : Boolean = {
    properties.getProperty(propertyName, propertyDefault) match {
      case "true" => true
      case "false" => false
      case _ => throw InvalidConfigException(s"$propertyName should be set to true or false")
    }
  }

  case class MissingConfigException(message: String) extends Exception(message)

  case class InvalidConfigException(message: String) extends Exception(message)

} 
开发者ID:groupon,项目名称:baryon,代码行数:60,代码来源:AppConfigs.scala



注:本文中的java.util.Properties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala FileInputStream类代码示例发布时间:2022-05-23
下一篇:
Scala URL类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap