本文整理汇总了Scala中kafka.consumer.ConsumerConfig类的典型用法代码示例。如果您正苦于以下问题:Scala ConsumerConfig类的具体用法?Scala ConsumerConfig怎么用?Scala ConsumerConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ConsumerConfig类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: KafkaClient
//设置package包名称以及导入依赖的类
package services.Kafka
import java.util.Properties
import java.util.concurrent.Executors
import akka.actor.{Props, DeadLetter, ActorSystem}
import kafka.consumer.{Consumer, ConsumerConfig}
import scala.concurrent.{ExecutionContext, Future}
object KafkaClient {
val config = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("group.id", "pogo_consumer")
properties.put("auto.offset.reset", "largest")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("zookeeper.session.timeout.ms", "400")
properties.put("zookeeper.sync.time.ms", "200")
properties.put("auto.commit.interval.ms", "500")
new ConsumerConfig(properties)
}
// Our actor system managing our actors
val system = ActorSystem("es-sharpshooter")
// Taking care of dead letters
system.eventStream.subscribe(system.actorOf(Props[IndexService], "dead-letters"), classOf[DeadLetter])
// Dedicated Kafka Execution context
implicit val KafkaContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
def start = {
// Starting our consumer
val consumer = Consumer.create(config)
val topics = Map(
"pokemons" -> 1,
"spawnpoints" -> 1
)
val streams = consumer.createMessageStreams(topics)
// Start the consumer asynchronously
Future {
streams.get("pokemons").get.foreach(PokemonService.cycle(system))
} onFailure { case ec => println(ec) }
Future {
streams.get("spawnpoints").get.foreach(SpawnService.cycle(system))
} onFailure { case ec => println(ec) }
}
}
开发者ID:fiahil,项目名称:Talks,代码行数:53,代码来源:KafkaClient.scala
示例2: KafkaConsumer
//设置package包名称以及导入依赖的类
package Services
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "500")
props.put("auto.commit.interval.ms", "500")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read() =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Throwable =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Throwable =>
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:pranjut,项目名称:consul-kafka-microservices,代码行数:58,代码来源:KafkaConsumer.scala
示例3: SimpleKafkaProducerTest
//设置package包名称以及导入依赖的类
package com.example
import java.nio.charset.StandardCharsets
import kafka.consumer.ConsumerConfig
import kafka.utils.TestUtils
import org.scalatest.{FunSpec, Matchers}
//import org.junit.Assert._
import scala.collection.immutable.HashMap
class SimpleKafkaProducerTest extends FunSpec with Matchers{
private val topic = "test"
private val groupId = "group0"
private val kafkaHelpers = new KafkaHelpers()
case class MessageData(a: String, b: String)
describe("The SimpleKafka Api") {
it("Should send data using a producer") {
//Send data to Kafka
val kafkaApi = new SimpleKafkaProducer(kafkaHelpers.kafkaSocket(), topic)
kafkaApi.send[MessageData](new MessageData("Hello", "World"))
//Create consumer
val consumerProperties = TestUtils.createConsumerProperties(kafkaHelpers.zookeeperSocket().toString(), groupId, "consumer0", -1)
val consumer = kafka.consumer.Consumer.create(new ConsumerConfig(consumerProperties))
val topicCountMap = HashMap(topic -> 1)
val consumerMap = consumer.createMessageStreams(topicCountMap)
val stream = consumerMap.get(topic).get(0)
val iterator = stream.iterator()
val msg = new String(iterator.next().message(), StandardCharsets.UTF_8)
assert("{\"a\":\"Hello\",\"b\":\"World\"}" == msg)
// cleanup
consumer.shutdown()
}
}
}
开发者ID:frossi85,项目名称:financial-statistics-crawler,代码行数:42,代码来源:SimpleKafkaProducerTest.scala
示例4: SimpleKafkaConsumer
//设置package包名称以及导入依赖的类
package com.example
import java.nio.charset.StandardCharsets
import java.util.Properties
import kafka.consumer.ConsumerConfig
import org.json4s.{DefaultFormats, jackson}
import scala.collection.immutable.HashMap
class SimpleKafkaConsumer(kafkaSocket: Socket, zooKeeperSocket: Socket, groupId: String, topic: String) {
private def configuration = {
val deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
val props = new Properties()
props.put("bootstrap.servers", kafkaSocket.toString())
props.put("key.deserializer", deserializer)
props.put("value.deserializer", deserializer)
props.put("group.id", groupId)
props.put("consumer.id", "consumer0")
props.put("consumer.timeout", "-1")
props.put("auto.offset.reset", "smallest")
props.put("zookeeper.sync.time.ms", "200")
props.put("zookeeper.session.timeout.ms", "6000")
props.put("zookeeper.connect", zooKeeperSocket.toString())
props.put("num.consumer.fetchers", "2")
props.put("rebalance.max.retries", "4")
props.put("auto.commit.interval.ms", "1000")
props
}
private val consumer = kafka.consumer.Consumer.create(new ConsumerConfig(configuration))
def read[T <: AnyRef]()(implicit m: Manifest[T]): Iterable[T] = {
implicit val serialization = jackson.Serialization
implicit val formats = DefaultFormats
val topicCountMap = HashMap(topic -> 1)
val consumerMap = consumer.createMessageStreams(topicCountMap)
val stream = consumerMap.get(topic).get(0)
val iterator = stream.iterator()
iterator.map(x => serialization.read[T](new String(x.message(), StandardCharsets.UTF_8))).toStream
}
def shutdown() = {
consumer.shutdown()
}
}
开发者ID:frossi85,项目名称:financial-statistics-collector,代码行数:48,代码来源:SimpleKafkaConsumer.scala
示例5: KafkaConsumer
//设置package包名称以及导入依赖的类
package com.knoldus.kafka.consumer
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
ex.printStackTrace()
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:knoldus,项目名称:activator-kafka-scala-producer-consumer.g8,代码行数:59,代码来源:KafkaConsumer.scala
示例6: kafkaConsumer
//设置package包名称以及导入依赖的类
import java.util
import java.util.Properties
import kafka.consumer.ConsumerConfig
object kafkaConsumer {
def apply(topic: String): kafkaConsumer = new kafkaConsumer(topic)
}
class kafkaConsumer private (topic: String) {
val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig)
def createConsumerConfig: ConsumerConfig = {
val props: Properties = new Properties()
props.put("zookeeper.connect", "")
props.put("group.id", "")
props.put("zookeeper.session.timeout.ms", "40000")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
}
def start = {
val topicsCountMap = new util.HashMap[String,Integer]()
topicsCountMap.put(topic, new Integer(1))
val consumerMap = consumer.createMessageStreams(topicsCountMap)
consumerMap.get(topic).get(0).foreach(println)
}
}
object Main {
def main(args: Array[String]): Unit = {
val t = kafkaConsumer
t("topic").start
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:38,代码来源:kafkaConsumer.scala
示例7: SimpleConsumer
//设置package包名称以及导入依赖的类
package packt.ch05
import java.util
import java.util.Properties
import kafka.consumer.ConsumerConfig
import SimpleConsumer._
import scala.collection.JavaConversions._
object SimpleConsumer {
private def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
}
def main(args: Array[String]) {
val zooKeeper = args(0)
val groupId = args(1)
val topic = args(2)
val simpleHLConsumer = new SimpleConsumer(zooKeeper, groupId, topic)
simpleHLConsumer.testConsumer()
}
}
class SimpleConsumer(zookeeper: String, groupId: String, private val topic: String) {
private val consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId))
def testConsumer() {
val topicMap = new util.HashMap[String, Integer]()
topicMap.put(topic, 1)
val consumerStreamsMap = consumer.createMessageStreams(topicMap)
val streamList = consumerStreamsMap.get(topic)
for (stream <- streamList; aStream <- stream)
println("Message from Single Topic :: " + new String(aStream.message()))
if (consumer != null) {
consumer.shutdown()
}
}
}
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:49,代码来源:SimpleConsumer.scala
示例8: KafkaConsumer
//设置package包名称以及导入依赖的类
package services.kafka.consumer
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
import play.api.Logger
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
Logger.info("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
Logger.error("Getting error when reading message ",ex)
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:satendrakumar,项目名称:play-kafka-example,代码行数:59,代码来源:KafkaConsumer.scala
示例9: KafkaConsumer
//设置package包名称以及导入依赖的类
package services
import java.util.Properties
import akka.actor.{Props, Actor}
import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig}
import play.Logger
object KafkaConsumer {
def props(topic: String): Props = Props(new KafkaConsumer(topic))
}
class KafkaConsumer(topic: String) extends Actor {
val props = new Properties()
props.put("auto.commit", "true")
//props.put("zookeeper.connect", "localhost:2181")
props.put("zookeeper.connect", "zookeeper1:2181")
def receive = {
case group: String => {
props.put("group.id", group)
val connector = Consumer.create(new ConsumerConfig(props))
val stream: KafkaStream[Array[Byte], Array[Byte]] = connector.createMessageStreams(Map(topic -> 1)).get(topic).get(0)
try {
for (message <- stream) {
try {
Logger.debug(new String(message.message()))
} catch {
case e: Throwable => Logger.error("error processing message, skipping and resume consumption: " + e)
}
}
}
}
}
}
开发者ID:panchul,项目名称:sb_scala,代码行数:37,代码来源:KafkaConsumer.scala
示例10: KafkaConsumer
//设置package包名称以及导入依赖的类
package controllers
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
ex.printStackTrace()
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:krishnangc,项目名称:kafka-scala-quartz,代码行数:59,代码来源:KafkaConsumer.scala
注:本文中的kafka.consumer.ConsumerConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论