本文整理汇总了Scala中kafka.consumer.SimpleConsumer类的典型用法代码示例。如果您正苦于以下问题:Scala SimpleConsumer类的具体用法?Scala SimpleConsumer怎么用?Scala SimpleConsumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SimpleConsumer类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SimpleConsumerFactory
//设置package包名称以及导入依赖的类
package com.box.castle.consumer
import com.box.kafka.Broker
import kafka.consumer.SimpleConsumer
import scala.concurrent.duration.FiniteDuration
// $COVERAGE-OFF$
class SimpleConsumerFactory {
def create(broker: Broker,
brokerTimeout: FiniteDuration,
bufferSize: Int,
clientId: ClientId): SimpleConsumer = {
require(brokerTimeout.toMillis > 0, "broker timeout must be positive")
new SimpleConsumer(broker.host,
broker.port,
brokerTimeout.toMillis.toInt,
bufferSize,
clientId.value)
}
}
// $COVERAGE-ON$
开发者ID:Box-Castle,项目名称:router,代码行数:25,代码来源:SimpleConsumerFactory.scala
示例2: OffsetManagement
//设置package包名称以及导入依赖的类
package org.hpi.esb.util
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadata
import org.hpi.esb.commons.util.Logging
object OffsetManagement extends Logging {
def getNumberOfMessages(topic: String, partition: Int): Long = {
val clientId = "GetOffset"
val topicsMetadata = getMetaData(topic, clientId)
getLatestOffset(topicsMetadata, topic, partition, clientId)
}
private def getMetaData(topic: String, clientId: String) = {
val brokerList = "192.168.30.208:9092, 192.168.30.207:9092, 192.168.30.141:9092"
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val maxWaitMs = 1000
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
logger.error(s"Error: no valid topic metadata for topic: $topic, probably the topic does not exist, run kafka-list-topic.sh to verify")
sys.exit(1)
}
topicsMetadata
}
private def getLatestOffset(topicsMetadata: Seq[TopicMetadata], topic: String, partition: Int, clientId: String) = {
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partition)
val time = -1
val nOffsets = 1
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val timeout = 10000
val bufferSize = 100000
val consumer = new SimpleConsumer(leader.host, leader.port, timeout, bufferSize, clientId)
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
offsets.last
case None => logger.error(s"Error: partition $partition does not have a leader. Skip getting offsets"); sys.exit(1)
}
case None => logger.error(s"Error: partition $partition does not exist"); sys.exit(1)
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:60,代码来源:OffsetManagement.scala
示例3: KafkaCluster
//设置package包名称以及导入依赖的类
package org.apache.spark.streaming.kafka
//??org.apache.spark.streaming.kafka???? private[spark] object SimpleConsumerConfig????spark?????
import kafka.api.{OffsetCommitRequest, OffsetCommitResponse}
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
type Err = ArrayBuffer[Throwable]
@transient private var _config: SimpleConsumerConfig = null
def config: SimpleConsumerConfig = this.synchronized {
if (_config == null) {
//SimpleConsumerConfig?apply???????
//val brokers = kafkaParams.get("metadata.broker.list").orElse(kafkaParams.get("bootstrap.servers"))
//??kafkaParams????key=metadata.broker.list??bootstrap.servers???Value
_config = SimpleConsumerConfig(kafkaParams)
}
_config
}
def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
}
开发者ID:Dax1n,项目名称:spark-kafka-directstream-zookeeper,代码行数:35,代码来源:KafkaCluster.scala
示例4: newInstance
//设置package包名称以及导入依赖的类
package com.groupon.dse.kafka.common
import com.groupon.dse.configs.{AppConfigs, KafkaServerConfig}
import com.groupon.dse.kafka.common.KafkaException.LeaderNotAvailableException
import com.groupon.dse.kafka.partition.{Leader, Partition}
import com.groupon.dse.zookeeper.ZkClientBuilder
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.exception.ZkException
import org.slf4j.LoggerFactory
def newInstance(leader: Leader,
serverConfig: KafkaServerConfig,
clientName: String)
: SimpleConsumer = new SimpleConsumer(
leader.host,
leader.port,
serverConfig.socketTimeout,
serverConfig.socketBufferSize,
clientName
)
}
开发者ID:groupon,项目名称:baryon,代码行数:23,代码来源:ConsumerClientCache.scala
示例5: KafkaOffsetGetterSpec
//设置package包名称以及导入依赖的类
package com.quantifind.kafka.core
import com.quantifind.kafka.core.KafkaOffsetGetter.GroupTopicPartition
import com.quantifind.utils.ZkUtilsWrapper
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.ZkClient
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{Matchers => MockitoMatchers, Mockito}
import org.scalatest._
class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
trait Fixture {
val mockedZkClient = Mockito.mock(classOf[ZkClient])
val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper])
val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
val testPartitionLeader = 1
val offsetGetter = new KafkaOffsetGetter(mockedZkClient, mockedZkUtil)
offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
}
"KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture {
val testGroup = "testgroup"
val testTopic = "testtopic"
val testPartition = 1
val topicAndPartition = TopicAndPartition(testTopic, testPartition)
val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
KafkaOffsetGetter.offsetMap += (groupTopicPartition -> offsetAndMetadata)
when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
.thenReturn(Some(testPartitionLeader))
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)
offsetGetter.processPartition(testGroup, testTopic, testPartition) match {
case Some(offsetInfo) =>
offsetInfo.topic shouldBe testTopic
offsetInfo.group shouldBe testGroup
offsetInfo.partition shouldBe testPartition
offsetInfo.offset shouldBe 100
offsetInfo.logSize shouldBe 102
case None => fail("Failed to build offset data")
}
}
}
开发者ID:ZhouGitHub1,项目名称:KafkaOffsetMonitor,代码行数:57,代码来源:KafkaOffsetGetterSpec.scala
示例6: Broker
//设置package包名称以及导入依赖的类
package net.kemuridama.kafcon.model
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import javax.management._
import javax.management.remote._
import kafka.cluster.BrokerEndPoint
import kafka.consumer.SimpleConsumer
import org.joda.time.DateTime
case class Broker(
id: Int,
clusterId: Int,
host: String,
port: Int,
jmxPort: Option[Int],
timestamp: DateTime
) {
def toBrokerEndPoint = BrokerEndPoint(id, host, port)
def withSimpleConsumer[T](func: SimpleConsumer => T): Future[T] = Future {
val consumer = new SimpleConsumer(host, port, 3000, 65536, "kafcon-consumer")
val ret = func(consumer)
consumer.close
ret
}
def withMBeanServerConnection[T](func: MBeanServerConnection => T): Future[T] = Future {
jmxPort match {
case Some(port) => {
val jmxServiceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host, port))
val jmxConnector = JMXConnectorFactory.connect(jmxServiceUrl)
val ret = func(jmxConnector.getMBeanServerConnection)
jmxConnector.close
ret
}
case _ => sys.error("Not found JMX port")
}
}
}
开发者ID:kemuridama,项目名称:kafcon,代码行数:44,代码来源:Broker.scala
示例7: SimpleConsumers
//设置package包名称以及导入依赖的类
package com.godatadriven.kafka.offset
import com.google.gson.{Gson, GsonBuilder}
import kafka.consumer.SimpleConsumer
import org.apache.zookeeper.ZooKeeper
import scala.collection.JavaConversions._
class SimpleConsumers(zookeeper: ZooKeeper) {
val gson: Gson = new GsonBuilder().create()
val children: Map[String, SimpleConsumer] = zookeeper.getChildren("/brokers/ids", false).map(id => {
val brokerInfoJson: String = new String(zookeeper.getData("/brokers/ids/" + id, false, null))
val brokerInfo = gson.fromJson(brokerInfoJson, classOf[BrokerInfo])
id -> new SimpleConsumer(brokerInfo.getHost, brokerInfo.getPort, 10000, 100000, "consumerOffsetChecker")
}).toMap
def get(key: String): Option[SimpleConsumer] = {
children.get(key)
}
def close(): Unit = {
children.foreach(_._2.close())
}
}
开发者ID:godatadriven,项目名称:prometheus-kafka-offsets,代码行数:27,代码来源:SimpleConsumers.scala
示例8: kafkaHelper
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer
class kafkaHelper(config: kafkaConfig) {
private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
private val socketTimeoutMs = config.socketTimeoutMs
private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
private val consumerId = config.consumerId
private val retries = config.retries
private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs
def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
Stream(1 to retries: _*).map { _ =>
brokers.toStream.map { broker =>
val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
try {
consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
topicMeta.partitionsMetadata
}.map {
case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
partitionMetadata.partitionId == topicAndPartition.partition) =>
partitionMetadata.leader
} collectFirst {
case Some(broker) => kafkaBroker(broker.host, broker.port)
}
} catch {
case _: Throwable => None
} finally {
consumer.close()
}
} collectFirst {
case Some(broker) => broker
}
} filter{
case Some(_) => true
case None => Thread.sleep(refreshLeaderBackoffMs); false
} collectFirst { case Some(broker) => broker} match {
case Some(broker) => broker
case None => throw new Exception("Find leader failed!")
}
def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
val kafkaBroker(leaderHost, leaderPort) = broker
new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala
示例9: ChunkConsumer
//设置package包名称以及导入依赖的类
package example.consumer
import kafka.api.FetchRequestBuilder
import kafka.consumer.SimpleConsumer
case class ChunkConsumer(topics: List[String], partition: Int = 0, offset: Long = 0L, fetchSize: Int = 100) extends Consumer(topics){
private val clientId = kafkaConfig.getCustomString("consumer.clientId")
val simpleConsumer = new SimpleConsumer(
kafkaConfig.getCustomString("consumer.host"),
kafkaConfig.getCustomInt("consumer.port"),
kafkaConfig.getCustomInt("consumer.timeOut"),
kafkaConfig.getCustomInt("consumer.bufferSize"),
clientId)
def read(): Iterable[String] = {
//println(simpleConsumer.toString)
val fetchRequest = new FetchRequestBuilder().clientId(clientId)
for(topic <- topics) {
fetchRequest.addFetch(topic, partition, offset, fetchSize)
}
val fetchResponse = simpleConsumer.fetch(fetchRequest.build())
fetchResponse.data.values.flatMap { topic =>
topic.messages.toList.map { mao =>
val payload = mao.message.payload
//ugliest part of the code. Thanks to kafka
val data = Array.fill[Byte](payload.limit)(0)
payload.get(data)
new String(data)
}
}
}
}
开发者ID:alonsoir,项目名称:hello-kafka-twitter-scala,代码行数:39,代码来源:ChunkConsumer.scala
示例10: ZookeeperOffsetMetadataManagerFactory
//设置package包名称以及导入依赖的类
package com.box.castle.consumer.offsetmetadatamanager
import java.nio.file.Path
import java.util.concurrent.TimeUnit._
import kafka.consumer.SimpleConsumer
import org.apache.curator.RetryPolicy
import org.apache.curator.retry.ExponentialBackoffRetry
import scala.concurrent.duration.{FiniteDuration, Duration}
class ZookeeperOffsetMetadataManagerFactory(rootNameSpace: Path,
connectionEndpoint: String,
retryPolicy: RetryPolicy = new ExponentialBackoffRetry(1500, 3),
initialConnectionTimeout: Duration = FiniteDuration(10, SECONDS),
connectionTimeout: Duration = FiniteDuration(10, SECONDS),
sessionTimeout: Duration = FiniteDuration(30, SECONDS))
extends OffsetMetadataManagerFactory {
def create(consumer: SimpleConsumer): ZookeeperOffsetMetadataManager = {
new ZookeeperOffsetMetadataManager(rootNameSpace, connectionEndpoint, retryPolicy,
initialConnectionTimeout, connectionTimeout, sessionTimeout)
}
}
开发者ID:Box-Castle,项目名称:router,代码行数:24,代码来源:ZookeeperOffsetMetadataManagerFactory.scala
示例11: RunningEmbeddedKafka
//设置package包名称以及导入依赖的类
package com.pygmalios.sparkCheckpointExperience.kafka.embedded
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.Logging
import scala.concurrent.duration._
class RunningEmbeddedKafka(producer: KafkaProducer[String, String],
consumer: SimpleConsumer,
topic: String) extends Logging {
val topicAndPartition = TopicAndPartition(topic, 0)
def publish(topic: String, message: String): Unit = publish(topic, null, message)
def publish(topic: String, key: String, message: String): Unit = {
producer.send(new ProducerRecord(topic, key, message)).get(3, SECONDS)
val latest = getOffset(OffsetRequest.LatestTime)
val earliest = getOffset(OffsetRequest.EarliestTime)
log.info(f"$topic [$earliest%3d:$latest%3d]: $key%3s -> $message%3s")
}
private def getOffset(time: Long): Long = {
val response = consumer.getOffsetsBefore(OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, 100))))
response.partitionErrorAndOffsets(topicAndPartition).offsets.head
}
}
开发者ID:pygmalios,项目名称:spark-checkpoint-experience,代码行数:31,代码来源:RunningEmbeddedKafka.scala
注:本文中的kafka.consumer.SimpleConsumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论