本文整理汇总了Scala中java.util.Properties类的典型用法代码示例。如果您正苦于以下问题:Scala Properties类的具体用法?Scala Properties怎么用?Scala Properties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Properties类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Access
//设置package包名称以及导入依赖的类
package hu.blackbelt.cd.bintray.deploy
import java.nio.file.{Files, StandardCopyOption}
import java.util.{Properties, UUID}
import awscala.s3.S3
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.GetObjectRequest
import hu.blackbelt.cd.bintray.VFS.FS
object Access {
val bintray_organization = "bintray.organization"
val bintray_user = "bintray.user"
val bintray_apikey = "bintray.apikey"
val aws_accessKeyId = "aws.accessKeyId"
val aws_secretKey = "aws.secretKey"
def collect = {
implicit val s3 = S3()(com.amazonaws.regions.Region.getRegion(Regions.EU_CENTRAL_1))
val destination = FS.getPath(s"/tmp/${UUID.randomUUID().toString}")
Files.createDirectories(destination)
val s3Object = s3.getObject(new GetObjectRequest("blackbelt-secrets", "bintray-deploy/access.properties"))
Files.copy(s3Object.getObjectContent, destination, StandardCopyOption.REPLACE_EXISTING)
import scala.collection.JavaConverters._
val prop = new Properties()
prop.load(Files.newInputStream(destination))
prop.entrySet().asScala.foreach {
(entry) => {
sys.props += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
}
}
}
}
开发者ID:tsechov,项目名称:s3-bintray-deploy,代码行数:39,代码来源:Access.scala
示例2: HelloWorldSpring
//设置package包名称以及导入依赖的类
package helloworld
import java.io.FileInputStream
import java.util.Properties
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.support.{DefaultListableBeanFactory, PropertiesBeanDefinitionReader}
object HelloWorldSpring extends App {
@throws(classOf[Exception])
val factory: BeanFactory = getBeanFactory
val mr: MessageRenderer = factory.getBean("renderer").asInstanceOf[MessageRenderer]
val mp: MessageProvider = factory.getBean("provider").asInstanceOf[MessageProvider]
mr.setMessageProvider(mp)
mr.render
@throws(classOf[Exception])
private def getBeanFactory: BeanFactory = {
val factory: DefaultListableBeanFactory = new DefaultListableBeanFactory
val rdr: PropertiesBeanDefinitionReader = new PropertiesBeanDefinitionReader(factory)
val props: Properties = new Properties
props.load(new FileInputStream("springdi_scala/src/helloworld/beans.properties"))
rdr.registerBeanDefinitions(props)
return factory
}
}
开发者ID:BBK-PiJ-2015-67,项目名称:sdp-portfolio,代码行数:27,代码来源:HelloWorldSpring.scala
示例3: SimpleKafkaProducer
//设置package包名称以及导入依赖的类
package com.example
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.json4s.{DefaultFormats, jackson}
class SimpleKafkaProducer(kafkaSocket: Socket, topic: String, brokers: Int = 1) {
private val serializer = "org.apache.kafka.common.serialization.StringSerializer"
private def configuration = {
val props = new Properties()
props.put("bootstrap.servers", kafkaSocket.toString())
props.put("key.serializer", serializer)
props.put("value.serializer", serializer)
props
}
def send[T <: AnyRef](message: T) = {
implicit val serialization = jackson.Serialization
implicit val formats = DefaultFormats
val producer = new KafkaProducer[String, String](configuration)
val jsonMessage = serialization.write[T](message)
val data = new ProducerRecord[String, String](topic, jsonMessage)
producer.send(data)
producer.close()
}
}
开发者ID:frossi85,项目名称:financial-statistics-crawler,代码行数:31,代码来源:SimpleKafkaProducer.scala
示例4: Mailer
//设置package包名称以及导入依赖的类
package org.kirhgoff.lastobot
import java.util.Properties
import javax.mail.{Message, Session}
import javax.mail.internet.{InternetAddress, MimeMessage}
import scala.io.Source
object Mailer {
val host = "smtp.gmail.com"
val port = "587"
val address = "[email protected]"
val username = "lastobot"
val password = Source.fromFile(System.getProperty("user.home")
+ "/.lastobot/.mail").getLines.mkString
def sendMail(text:String, subject:String) = {
val properties = new Properties()
properties.put("mail.smtp.port", port)
properties.put("mail.smtp.auth", "true")
properties.put("mail.smtp.starttls.enable", "true")
val session = Session.getDefaultInstance(properties, null)
val message = new MimeMessage(session)
message.addRecipient(Message.RecipientType.TO, new InternetAddress(address));
message.setSubject(subject)
message.setContent(text, "text/html")
val transport = session.getTransport("smtp")
transport.connect(host, username, password)
transport.sendMessage(message, message.getAllRecipients)
}
def main(args:Array[String]) = {
sendMail("aaaa", "bbb")
}
}
开发者ID:kirhgoff,项目名称:lastobot,代码行数:40,代码来源:Mailer.scala
示例5: ReadyKafkaProducer
//设置package包名称以及导入依赖的类
package com.bencassedy.readykafka.producer
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
class ReadyKafkaProducer {
case class KafkaProducerConfigs(brokerList: String = "127.0.0.1:9092") {
val properties = new Properties()
properties.put("bootstrap.servers", brokerList)
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
// properties.put("serializer.class", classOf[StringDeserializer])
// properties.put("batch.size", 16384)
// properties.put("linger.ms", 1)
// properties.put("buffer.memory", 33554432)
}
val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)
def produce(topic: String, messages: Iterable[String]): Unit = {
messages.foreach { m =>
producer.send(new ProducerRecord[String, String](topic, m))
}
producer.close(100L, TimeUnit.MILLISECONDS)
}
}
开发者ID:bencassedy,项目名称:ready-kafka,代码行数:31,代码来源:ReadyKafkaProducer.scala
示例6: fillFromEnv
//设置package包名称以及导入依赖的类
package hu.blackbelt.cd.bintray.deploy
import java.io.{File, FileInputStream}
import java.util.Properties
import org.scalatest.{BeforeAndAfter, Suite}
trait Creds extends BeforeAndAfter {
this: Suite =>
def fillFromEnv(prop: Properties) = {
def put(key: String) = sys.env.get(key.replace('.','_')).map(prop.put(key.replace('_','.'), _))
put(Access.aws_accessKeyId)
put(Access.aws_secretKey)
put(Access.bintray_organization)
put(Access.bintray_user)
put(Access.bintray_apikey)
}
before {
import scala.collection.JavaConverters._
val prop = new Properties()
val propsFile = new File("env.properties")
if (propsFile.exists()) {
prop.load(new FileInputStream(propsFile))
} else {
fillFromEnv(prop)
}
prop.entrySet().asScala.foreach {
(entry) => {
sys.props += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
}
}
}
}
开发者ID:tsechov,项目名称:s3-bintray-deploy,代码行数:38,代码来源:Creds.scala
示例7: Generator
//设置package包名称以及导入依赖的类
package data.processing.kafkagenerator
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.github.andr83.scalaconfig._
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import data.processing.avro.AvroEncoder
import scala.concurrent.forkjoin.ThreadLocalRandom
object Generator {
val metricsRegistry = new MetricsRegistry
val config = ConfigFactory.load()
val props = config.getConfig("kafka-client").as[Properties]
val topic = config.getString("kafka-client.topic")
val numberOfUsers = config.getInt("generator.number.of.users")
val urls = config.getStringList("generator.urls")
val eventTypes = config.getStringList("generator.event.types")
val throughput = config.getInt("generator.throughput")
val avroEncoder = new AvroEncoder("/event-record.json")
def generateEvent() = {
val id = ThreadLocalRandom.current().nextLong()
val ts = java.lang.System.currentTimeMillis()
val userId = ThreadLocalRandom.current().nextInt(numberOfUsers).toHexString
val url = urls.get(ThreadLocalRandom.current().nextInt(urls.size()))
val eventType = eventTypes.get(ThreadLocalRandom.current().nextInt(eventTypes.size()))
(id, avroEncoder.encode((id, ts, userId, url, eventType)))
}
def main(args: Array[String]): Unit = {
val meter = metricsRegistry.newMeter(new MetricName("", "", ""), "", TimeUnit.SECONDS)
val producer = new KafkaProducer[String, Array[Byte]](props)
while(true) {
if (meter.meanRate < throughput) {
meter.mark()
val event = generateEvent()
producer.send(new ProducerRecord[String, Array[Byte]](topic, event._1.toString, event._2))
}
else {
Thread.sleep(1)
}
}
producer.flush()
producer.close()
}
}
开发者ID:ipogudin,项目名称:data-processing-examples,代码行数:56,代码来源:Generator.scala
示例8: fromData
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.pdx
import java.util.Properties
import org.apache.geode.cache.Declarable
import org.apache.geode.pdx.{PdxReader, PdxSerializer, PdxWriter}
override def fromData(clazz: Class[_], in: PdxReader): AnyRef =
serializers
.get(clazz)
.map(_.fromData(clazz, in))
.orElse(serializers.collectFirst {
case (c, ser) if isPdxCompat(c, clazz) =>
val v = ser.fromData(clazz, in)
if (v != null) register(ser, clazz)
v
})
.orNull
override def init(props: Properties): Unit = {}
}
开发者ID:akka,项目名称:alpakka,代码行数:23,代码来源:DelegatingPdxSerializer.scala
示例9: ClickhouseConnectionFactory
//设置package包名称以及导入依赖的类
package io.clickhouse.ext
import java.util.Properties
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickhouseConnectionFactory extends Serializable{
private val dataSources = scala.collection.mutable.Map[(String, Int), ClickHouseDataSource]()
def get(host: String, port: Int = 8123): ClickHouseDataSource ={
dataSources.get((host, port)) match {
case Some(ds) =>
ds
case None =>
val ds = createDatasource(host, port = port)
dataSources += ((host, port) -> ds)
ds
}
}
private def createDatasource(host: String, dbO: Option[String] = None, port: Int = 8123) = {
val props = new Properties()
dbO map {db => props.setProperty("database", db)}
val clickHouseProps = new ClickHouseProperties(props)
new ClickHouseDataSource(s"jdbc:clickhouse://$host:$port", clickHouseProps)
}
}
开发者ID:DmitryBe,项目名称:spark-clickhouse,代码行数:30,代码来源:ClickhouseConnectionFactory.scala
示例10: DataDriver
//设置package包名称以及导入依赖的类
package org.hpi.esb.datasender
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.hpi.esb.commons.config.Configs
import org.hpi.esb.commons.util.Logging
import org.hpi.esb.datasender.config._
import org.hpi.esb.datasender.output.writers.DatasenderRunResultWriter
import scala.io.Source
class DataDriver() extends Logging {
private val topics = Configs.benchmarkConfig.sourceTopics
private val config = ConfigHandler.config
private val dataReader = createDataReader(config.dataReaderConfig)
private val kafkaProducerProperties = createKafkaProducerProperties(config.kafkaProducerConfig)
private val kafkaProducer = new KafkaProducer[String, String](kafkaProducerProperties)
private val resultHandler = new DatasenderRunResultWriter(config, Configs.benchmarkConfig, kafkaProducer)
private val dataProducer = createDataProducer(kafkaProducer, dataReader, resultHandler)
def run(): Unit = {
dataProducer.execute()
}
def createKafkaProducerProperties(kafkaProducerConfig: KafkaProducerConfig): Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfig.bootstrapServers.get)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.keySerializerClass.get)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfig.valueSerializerClass.get)
props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfig.acks.get)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfig.batchSize.get.toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfig.lingerTime.toString)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProducerConfig.bufferMemorySize.toString)
props
}
def createDataReader(dataReaderConfig: DataReaderConfig): DataReader = {
new DataReader(Source.fromFile(dataReaderConfig.dataInputPath.get),
dataReaderConfig.columns.get,
dataReaderConfig.columnDelimiter.get,
dataReaderConfig.dataColumnStart.get,
dataReaderConfig.readInRam)
}
def createDataProducer(kafkaProducer: KafkaProducer[String, String], dataReader: DataReader,
resultHandler: DatasenderRunResultWriter): DataProducer = {
val numberOfThreads = config.dataSenderConfig.numberOfThreads.get
val sendingInterval = Configs.benchmarkConfig.sendingInterval
val sendingIntervalTimeUnit = Configs.benchmarkConfig.getSendingIntervalTimeUnit()
val duration = Configs.benchmarkConfig.duration
val durationTimeUnit = Configs.benchmarkConfig.getDurationTimeUnit()
val singleColumnMode = config.dataSenderConfig.singleColumnMode
new DataProducer(resultHandler, kafkaProducer, dataReader, topics, numberOfThreads,
sendingInterval, sendingIntervalTimeUnit, duration, durationTimeUnit, singleColumnMode)
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:62,代码来源:DataDriver.scala
示例11: EmailParser
//设置package包名称以及导入依赖的类
package uk.pkerrigan.dmarcparser
import java.io.ByteArrayInputStream
import java.nio.charset.CodingErrorAction
import java.util.Properties
import java.util.zip.{GZIPInputStream, ZipInputStream}
import javax.activation.DataSource
import javax.mail.Session
import javax.mail.internet.MimeMessage
import scala.collection.JavaConverters._
import org.apache.commons.mail.util.MimeMessageParser
import uk.pkerrigan.dmarcparser.report.Feedback
import scala.io._
class EmailParser(parser: ParserTrait = new Parser()) extends EmailParserTrait{
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
def parseEmail(email: String): Option[Feedback] = {
val s = Session.getDefaultInstance(new Properties())
val is = new ByteArrayInputStream(email.getBytes)
val message = new MimeMessage(s, is)
val messageParser = new MimeMessageParser(message).parse()
messageParser.getAttachmentList.asScala.headOption.flatMap(extract)
}
private def extract(a: DataSource): Option[Feedback] = a match {
case `a` if a.getContentType.equals("application/gzip") => extractGzip(a)
case `a` if a.getContentType.equals("application/x-gzip") => extractGzip(a)
case `a` if a.getContentType.equals("application/zip") => extractZip(a)
case `a` if a.getContentType.equals("application/x-zip-compressed") => extractZip(a)
case _ => None
}
private def extractZip(a: DataSource): Option[Feedback] = {
val zip = new ZipInputStream(a.getInputStream)
zip.getNextEntry
val rawXml = Source.fromInputStream(zip).mkString
if (rawXml == "") None else Some(parser.parse(rawXml))
}
private def extractGzip(a: DataSource): Option[Feedback] = {
val zip = new GZIPInputStream(a.getInputStream)
val rawXml = Source.fromInputStream(zip).mkString
if (rawXml == "") None else Some(parser.parse(rawXml))
}
}
开发者ID:patrickkerrigan,项目名称:dmarc-parser,代码行数:52,代码来源:EmailParser.scala
示例12:
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.productservice
import java.util.Properties
package object kafka {
val orderConsumerCfg: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "product.consumers")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props
}
val confirmationProducerCfg: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", "0")
props.put("batch.size", "16384")
props.put("linger.ms", "1")
props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:33,代码来源:package.scala
示例13:
//设置package包名称以及导入依赖的类
package com.github.simonthecat.eventdrivenorders.orderservice
import java.util.Properties
package object kafka {
val orderConsumerCfg: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "order.consumers")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props
}
val storeConfirmationConsumer: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "order.consumers")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props
}
val producerCfg: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", "0")
props.put("batch.size", "16384")
props.put("linger.ms", "1")
props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:45,代码来源:package.scala
示例14:
//设置package包名称以及导入依赖的类
package com.github.eventdrivenorders.api
import java.util.Properties
package object kafka {
val producerCfg: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", "0")
props.put("batch.size", "16384")
props.put("linger.ms", "1")
props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props
}
val orderStatusConsumer: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "api.consumers")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props
}
}
开发者ID:simonko91,项目名称:event-driven-orders,代码行数:34,代码来源:package.scala
示例15: Helpers
//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.utils
import java.util.Properties
import java.util.concurrent.{CancellationException, TimeUnit, Future => JFuture}
import com.typesafe.config.Config
import org.jboss.netty.util.{HashedWheelTimer, Timeout, TimerTask}
import scala.concurrent.{Future, Promise}
import scala.util.Try
object Helpers {
private val pollIntervalMs = 50L
private val timer = new HashedWheelTimer(pollIntervalMs, TimeUnit.MILLISECONDS)
implicit class JFutureHelpers[T](jf: JFuture[T]) {
def asScala: Future[T] = {
val promise = Promise[T]()
def checkCompletion(): Unit = {
if (jf.isCancelled) {
promise.failure(new CancellationException())
} else if (jf.isDone) {
promise.complete(Try(jf.get))
} else {
scheduleTimeout()
}
()
}
def scheduleTimeout(): Unit = {
timer.newTimeout(new TimerTask {
override def run(timeout: Timeout): Unit = checkCompletion()
}, pollIntervalMs, TimeUnit.MILLISECONDS)
()
}
checkCompletion()
promise.future
}
}
implicit def propsFromConfig(config: Config): Properties = {
import scala.collection.JavaConversions._
val props = new Properties()
val map: Map[String, Object] = config.entrySet().map({ entry =>
entry.getKey -> entry.getValue.unwrapped()
})(collection.breakOut)
props.putAll(map)
props
}
}
开发者ID:mmolimar,项目名称:vkitm,代码行数:58,代码来源:Helpers.scala
示例16: EmbeddedVKitM
//设置package包名称以及导入依赖的类
package com.github.mmolimar.vkitm.embedded
import java.util.Properties
import com.github.mmolimar.vkitm.server.{VKitMConfig, VKitMServer}
import com.github.mmolimar.vkitm.utils.TestUtils
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.clients.producer.ProducerConfig
class EmbeddedVKitM(zkConnection: String,
brokerList: String,
port: Int = TestUtils.getAvailablePort) extends Logging {
private var vkitmServer: VKitMServer = null
def startup() {
info("Starting up VKitM server")
val serverProps = new Properties
serverProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
serverProps.setProperty(KafkaConfig.HostNameProp, "localhost")
serverProps.setProperty(KafkaConfig.PortProp, port.toString)
serverProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + port)
val producerProps = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val brokerPort = brokerList.split(":")(1)
val consumerProps = new Properties
consumerProps.setProperty(KafkaConfig.ZkConnectProp, zkConnection)
consumerProps.setProperty(KafkaConfig.HostNameProp, "localhost")
consumerProps.setProperty(KafkaConfig.PortProp, brokerPort)
consumerProps.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:" + brokerPort)
vkitmServer = new VKitMServer(VKitMConfig.fromProps(serverProps, producerProps, consumerProps))
vkitmServer.startup()
info("Started embedded VKitM server")
}
def shutdown() {
vkitmServer.shutdown()
}
def getPort: Int = port
def getBrokerList: String = "localhost:" + getPort
def getServer: VKitMServer = vkitmServer
override def toString: String = {
val sb: StringBuilder = StringBuilder.newBuilder
sb.append("VKitM{")
sb.append("config='").append(vkitmServer.config).append('\'')
sb.append('}')
sb.toString
}
}
开发者ID:mmolimar,项目名称:vkitm,代码行数:62,代码来源:EmbeddedVKitM.scala
示例17: SimpleKafkaConsumer
//设置package包名称以及导入依赖的类
package com.example
import java.nio.charset.StandardCharsets
import java.util.Properties
import kafka.consumer.ConsumerConfig
import org.json4s.{DefaultFormats, jackson}
import scala.collection.immutable.HashMap
class SimpleKafkaConsumer(kafkaSocket: Socket, zooKeeperSocket: Socket, groupId: String, topic: String) {
private def configuration = {
val deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
val props = new Properties()
props.put("bootstrap.servers", kafkaSocket.toString())
props.put("key.deserializer", deserializer)
props.put("value.deserializer", deserializer)
props.put("group.id", groupId)
props.put("consumer.id", "consumer0")
props.put("consumer.timeout", "-1")
props.put("auto.offset.reset", "smallest")
props.put("zookeeper.sync.time.ms", "200")
props.put("zookeeper.session.timeout.ms", "6000")
props.put("zookeeper.connect", zooKeeperSocket.toString())
props.put("num.consumer.fetchers", "2")
props.put("rebalance.max.retries", "4")
props.put("auto.commit.interval.ms", "1000")
props
}
private val consumer = kafka.consumer.Consumer.create(new ConsumerConfig(configuration))
def read[T <: AnyRef]()(implicit m: Manifest[T]): Iterable[T] = {
implicit val serialization = jackson.Serialization
implicit val formats = DefaultFormats
val topicCountMap = HashMap(topic -> 1)
val consumerMap = consumer.createMessageStreams(topicCountMap)
val stream = consumerMap.get(topic).get(0)
val iterator = stream.iterator()
iterator.map(x => serialization.read[T](new String(x.message(), StandardCharsets.UTF_8))).toStream
}
def shutdown() = {
consumer.shutdown()
}
}
开发者ID:frossi85,项目名称:financial-statistics-collector,代码行数:48,代码来源:SimpleKafkaConsumer.scala
示例18: EmbeddedKafka
//设置package包名称以及导入依赖的类
package com.softwaremill.embeddedkafka
import java.io.IOException
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.Properties
import akka.actor.Actor
import utils.embeddedkafka.KafkaLocal
class EmbeddedKafka extends Actor {
var embeddedKafka: Option[KafkaLocal] = None
override def preStart(): Unit = {
super.preStart()
deleteKafkaData()
embeddedKafka = Some(initEmbeddedKafka())
context.parent ! "Start"
}
override def postStop(): Unit = {
embeddedKafka.foreach(_.stop())
super.postStop()
}
def initEmbeddedKafka() = {
val kafkaProperties = new Properties()
val zkProperties = new Properties()
kafkaProperties.load(getClass.getResourceAsStream("/kafkalocal.properties"))
zkProperties.load(getClass.getResourceAsStream("/zklocal.properties"))
new KafkaLocal(kafkaProperties, zkProperties)
}
override def receive: Actor.Receive = {
case _ =>
}
def deleteKafkaData(): Unit = {
val path = Paths.get("./data")
Files.walkFileTree(path, new FileVisitor[Path] {
override def visitFileFailed(file: Path, exc: IOException) = FileVisitResult.CONTINUE
override def visitFile(file: Path, attrs: BasicFileAttributes) = {
Files.delete(file)
FileVisitResult.CONTINUE
}
override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = FileVisitResult.CONTINUE
override def postVisitDirectory(dir: Path, exc: IOException) = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
})
}
}
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:58,代码来源:EmbeddedKafka.scala
示例19: KafkaUtilities
//设置package包名称以及导入依赖的类
package com.fortysevendeg.log.utils
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkConnection
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaUtilities {
def createKafkaProducer(): Producer[String, String] = {
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
// props.put("partitioner.class", "com.fortysevendeg.biglog.SimplePartitioner")
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
}
def createKafkaConsumer(): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, String](props)
}
def createTopicIntoKafka(topic: String, numPartitions: Int, replicationFactor: Int): Unit = {
val zookeeperConnect = "localhost:2181"
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000
val zkClient = ZkUtils.createZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs)
val zkUtils = new ZkUtils(zkClient, zkConnection = new ZkConnection(zookeeperConnect), isSecure = false)
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, new Properties)
zkClient.close()
}
def d(kafkaProducer: Producer[String, String], topic: String, message: String) = {
kafkaProducer.send(new KeyedMessage[String, String](topic, message))
}
}
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:52,代码来源:KafkaUtilities.scala
示例20: AppConfigs
//设置package包名称以及导入依赖的类
package com.groupon.dse.configs
import java.util.Properties
import org.apache.spark.storage.StorageLevel
object AppConfigs {
val SparkReceivers = ("spark.num.receivers", "1")
val SparkStorageLevel = ("spark.storage.level", "MEMORY_AND_DISK_SER_2")
val Topics = ("topics", "")
val TopicsBlackList = ("topics.blacklist", "")
val TopicsEnableBlockingConsumption = ("topic.consumption.blocking", "false")
val TopicConsumptionPolicy = ("topic.consumption.policy", "OFFSET")
val TopicConsumptionOffsetThreshold = ("topic.consumption.offset.threshold", "0")
val TopicConsumptionTimeThresholdMs = ("topic.consumption.time.threshold.ms", "1000")
val TopicFetchSizeBytes = ("topic.fetch.size.bytes", "1048576")
val TopicRepartitionFactor = ("topic.repartition.factor", "1")
val TopicStartOffset = ("topic.start.offset", "-1") //-1: Max, -2: Min, Other: Actual offset value
val PartitionRefreshIntervalMs = ("partition.refresh.interval.ms", "30000")
val PartitionWarmUpRefreshIntervalMs = ("partition.warmup.refresh.interval.ms", "10000")
val ReceiverRestIntervalOnFailMs = ("receiver.rest.interval.fail.ms", "2500")
val ReceiverRestIntervalOnSuccessMs = ("receiver.rest.interval.success.ms", "100")
val KafkaBrokerConnect = ("kafka.broker.zk.connect", "")
val KafkaSocketTimeoutMs = ("kafka.socket.timeout", "10000")
val KafkaSocketBufferSizeBytes = ("kafka.socket.buffer.size", "1048576")
val KafkaZkSessionTimeoutMs = ("kafka.zk.session.timeout.ms", "10000")
val KafkaZkConnectionTimeoutMs = ("kafka.zk.connection.timeout.ms", "10000")
val StateControllerType = ("statecontroller.type", "MEMORY")
val ZookeeperStateControllerConnect = ("statecontroller.zk.connect", "")
val ZookeeperStateControllerRoot = ("statecontroller.zk.root", "/baryon")
val ZookeeperStateControllerConnTimeoutMs = ("statecontroller.zk.conn.timeout.ms", "120000")
val ZookeeperStateControllerSessionTimeoutMs = ("statecontroller.zk.session.timeout.ms", "60000")
val TopicFetcherType = ("topics.fetcher.type", "LOCAL")
val HDFSTopicSource = ("topics.fetcher.hdfs.source", "")
val HTTPTopicSource = ("topics.fetcher.http.source", "")
def validatedBooleanConfig(
properties: Properties,
propertyName: String,
propertyDefault: String)
: Boolean = {
properties.getProperty(propertyName, propertyDefault) match {
case "true" => true
case "false" => false
case _ => throw InvalidConfigException(s"$propertyName should be set to true or false")
}
}
case class MissingConfigException(message: String) extends Exception(message)
case class InvalidConfigException(message: String) extends Exception(message)
}
开发者ID:groupon,项目名称:baryon,代码行数:60,代码来源:AppConfigs.scala
注:本文中的java.util.Properties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论