• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala SimpleConsumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.consumer.SimpleConsumer的典型用法代码示例。如果您正苦于以下问题:Scala SimpleConsumer类的具体用法?Scala SimpleConsumer怎么用?Scala SimpleConsumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SimpleConsumer类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: SimpleConsumerFactory

//设置package包名称以及导入依赖的类
package com.box.castle.consumer

import com.box.kafka.Broker
import kafka.consumer.SimpleConsumer

import scala.concurrent.duration.FiniteDuration

// $COVERAGE-OFF$
class SimpleConsumerFactory {

  def create(broker: Broker,
             brokerTimeout: FiniteDuration,
             bufferSize: Int,
             clientId: ClientId): SimpleConsumer = {
    require(brokerTimeout.toMillis > 0, "broker timeout must be positive")
    new SimpleConsumer(broker.host,
                       broker.port,
                       brokerTimeout.toMillis.toInt,
                       bufferSize,
                       clientId.value)
  }
}

// $COVERAGE-ON$ 
开发者ID:Box-Castle,项目名称:router,代码行数:25,代码来源:SimpleConsumerFactory.scala


示例2: OffsetManagement

//设置package包名称以及导入依赖的类
package org.hpi.esb.util

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadata
import org.hpi.esb.commons.util.Logging

object OffsetManagement extends Logging {

  def getNumberOfMessages(topic: String, partition: Int): Long = {

    val clientId = "GetOffset"

    val topicsMetadata = getMetaData(topic, clientId)
    getLatestOffset(topicsMetadata, topic, partition, clientId)
  }

  private def getMetaData(topic: String, clientId: String) = {
    val brokerList = "192.168.30.208:9092, 192.168.30.207:9092, 192.168.30.141:9092"
    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)


    val maxWaitMs = 1000
    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
    if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
      logger.error(s"Error: no valid topic metadata for topic: $topic, probably the topic does not exist, run kafka-list-topic.sh to verify")
      sys.exit(1)
    }

    topicsMetadata
  }

  private def getLatestOffset(topicsMetadata: Seq[TopicMetadata], topic: String, partition: Int, clientId: String) = {

    val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partition)
    val time = -1
    val nOffsets = 1

    partitionMetadataOpt match {
      case Some(metadata) =>
        metadata.leader match {
          case Some(leader) =>
            val timeout = 10000
            val bufferSize = 100000
            val consumer = new SimpleConsumer(leader.host, leader.port, timeout, bufferSize, clientId)
            val topicAndPartition = TopicAndPartition(topic, partition)
            val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

            offsets.last
          case None => logger.error(s"Error: partition $partition does not have a leader. Skip getting offsets"); sys.exit(1)
        }
      case None => logger.error(s"Error: partition $partition does not exist"); sys.exit(1)
    }
  }

} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:60,代码来源:OffsetManagement.scala


示例3: KafkaCluster

//设置package包名称以及导入依赖的类
package org.apache.spark.streaming.kafka

//??org.apache.spark.streaming.kafka????  private[spark] object SimpleConsumerConfig????spark?????

import kafka.api.{OffsetCommitRequest, OffsetCommitResponse}
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   type Err = ArrayBuffer[Throwable]

   @transient private var _config: SimpleConsumerConfig = null

   def config: SimpleConsumerConfig = this.synchronized {
     if (_config == null) {
       //SimpleConsumerConfig?apply???????
       //val brokers = kafkaParams.get("metadata.broker.list").orElse(kafkaParams.get("bootstrap.servers"))
       //??kafkaParams????key=metadata.broker.list??bootstrap.servers???Value
       _config = SimpleConsumerConfig(kafkaParams)
     }
     _config
   }

  
   def connect(host: String, port: Int): SimpleConsumer =
     new SimpleConsumer(host, port, config.socketTimeoutMs,
       config.socketReceiveBufferBytes, config.clientId)
 } 
开发者ID:Dax1n,项目名称:spark-kafka-directstream-zookeeper,代码行数:35,代码来源:KafkaCluster.scala


示例4: newInstance

//设置package包名称以及导入依赖的类
package com.groupon.dse.kafka.common

import com.groupon.dse.configs.{AppConfigs, KafkaServerConfig}
import com.groupon.dse.kafka.common.KafkaException.LeaderNotAvailableException
import com.groupon.dse.kafka.partition.{Leader, Partition}
import com.groupon.dse.zookeeper.ZkClientBuilder
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.exception.ZkException
import org.slf4j.LoggerFactory


  def newInstance(leader: Leader,
                  serverConfig: KafkaServerConfig,
                  clientName: String)
  : SimpleConsumer = new SimpleConsumer(
    leader.host,
    leader.port,
    serverConfig.socketTimeout,
    serverConfig.socketBufferSize,
    clientName
  )
} 
开发者ID:groupon,项目名称:baryon,代码行数:23,代码来源:ConsumerClientCache.scala


示例5: KafkaOffsetGetterSpec

//设置package包名称以及导入依赖的类
package com.quantifind.kafka.core

import com.quantifind.kafka.core.KafkaOffsetGetter.GroupTopicPartition
import com.quantifind.utils.ZkUtilsWrapper
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.ZkClient
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{Matchers => MockitoMatchers, Mockito}
import org.scalatest._

class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {

  trait Fixture {

    val mockedZkClient = Mockito.mock(classOf[ZkClient])
    val mockedZkUtil =  Mockito.mock(classOf[ZkUtilsWrapper])
    val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
    val testPartitionLeader = 1

    val offsetGetter = new KafkaOffsetGetter(mockedZkClient, mockedZkUtil)
    offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
  }

  "KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture {

    val testGroup = "testgroup"
    val testTopic = "testtopic"
    val testPartition = 1

    val topicAndPartition = TopicAndPartition(testTopic, testPartition)
    val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
    val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
    KafkaOffsetGetter.offsetMap += (groupTopicPartition -> offsetAndMetadata)

    when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
      .thenReturn(Some(testPartitionLeader))

    val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
    val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
    when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)

    offsetGetter.processPartition(testGroup, testTopic, testPartition) match {
      case Some(offsetInfo) =>
        offsetInfo.topic shouldBe testTopic
        offsetInfo.group shouldBe testGroup
        offsetInfo.partition shouldBe testPartition
        offsetInfo.offset shouldBe 100
        offsetInfo.logSize shouldBe 102
      case None => fail("Failed to build offset data")
    }
    
  }
} 
开发者ID:ZhouGitHub1,项目名称:KafkaOffsetMonitor,代码行数:57,代码来源:KafkaOffsetGetterSpec.scala


示例6: Broker

//设置package包名称以及导入依赖的类
package net.kemuridama.kafcon.model

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import javax.management._
import javax.management.remote._

import kafka.cluster.BrokerEndPoint
import kafka.consumer.SimpleConsumer
import org.joda.time.DateTime

case class Broker(
  id: Int,
  clusterId: Int,
  host: String,
  port: Int,
  jmxPort: Option[Int],
  timestamp: DateTime
) {

  def toBrokerEndPoint = BrokerEndPoint(id, host, port)

  def withSimpleConsumer[T](func: SimpleConsumer => T): Future[T] = Future {
    val consumer = new SimpleConsumer(host, port, 3000, 65536, "kafcon-consumer")
    val ret = func(consumer)
    consumer.close
    ret
  }

  def withMBeanServerConnection[T](func: MBeanServerConnection => T): Future[T] = Future {
    jmxPort match {
      case Some(port) => {
        val jmxServiceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host, port))
        val jmxConnector = JMXConnectorFactory.connect(jmxServiceUrl)
        val ret = func(jmxConnector.getMBeanServerConnection)
        jmxConnector.close
        ret
      }
      case _ => sys.error("Not found JMX port")
    }
  }

} 
开发者ID:kemuridama,项目名称:kafcon,代码行数:44,代码来源:Broker.scala


示例7: SimpleConsumers

//设置package包名称以及导入依赖的类
package com.godatadriven.kafka.offset

import com.google.gson.{Gson, GsonBuilder}
import kafka.consumer.SimpleConsumer
import org.apache.zookeeper.ZooKeeper

import scala.collection.JavaConversions._

class SimpleConsumers(zookeeper: ZooKeeper) {
  val gson: Gson = new GsonBuilder().create()

  val children: Map[String, SimpleConsumer] = zookeeper.getChildren("/brokers/ids", false).map(id => {
    val brokerInfoJson: String = new String(zookeeper.getData("/brokers/ids/" + id, false, null))

    val brokerInfo = gson.fromJson(brokerInfoJson, classOf[BrokerInfo])
    id -> new SimpleConsumer(brokerInfo.getHost, brokerInfo.getPort, 10000, 100000, "consumerOffsetChecker")
  }).toMap

  def get(key: String): Option[SimpleConsumer] = {
    children.get(key)
  }

  def close(): Unit = {
    children.foreach(_._2.close())
  }
} 
开发者ID:godatadriven,项目名称:prometheus-kafka-offsets,代码行数:27,代码来源:SimpleConsumers.scala


示例8: kafkaHelper

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer


class kafkaHelper(config: kafkaConfig) {
    private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
    private val socketTimeoutMs = config.socketTimeoutMs
    private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
    private val consumerId = config.consumerId
    private val retries = config.retries
    private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs

    def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
      Stream(1 to retries: _*).map { _ =>
          brokers.toStream.map { broker =>
            val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
            try {
              consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
                case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
                  topicMeta.partitionsMetadata
              }.map {
                case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
                  partitionMetadata.partitionId == topicAndPartition.partition) =>
                  partitionMetadata.leader
              } collectFirst {
                case Some(broker) => kafkaBroker(broker.host, broker.port)
              }
            } catch {
              case _: Throwable => None
            } finally {
              consumer.close()
            }
          } collectFirst {
            case Some(broker) => broker
          }
      } filter{
        case Some(_) => true
        case None    => Thread.sleep(refreshLeaderBackoffMs); false
      } collectFirst { case Some(broker) => broker} match {
        case Some(broker) => broker
        case None         => throw new Exception("Find leader failed!")
      }

    def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
      val kafkaBroker(leaderHost, leaderPort) = broker
      new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
    }
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala


示例9: ChunkConsumer

//设置package包名称以及导入依赖的类
package example.consumer

import kafka.api.FetchRequestBuilder
import kafka.consumer.SimpleConsumer

case class ChunkConsumer(topics: List[String], partition: Int = 0, offset: Long = 0L, fetchSize: Int = 100) extends Consumer(topics){

  private val clientId = kafkaConfig.getCustomString("consumer.clientId")

  val simpleConsumer = new SimpleConsumer(
    kafkaConfig.getCustomString("consumer.host"),
    kafkaConfig.getCustomInt("consumer.port"),
    kafkaConfig.getCustomInt("consumer.timeOut"),
    kafkaConfig.getCustomInt("consumer.bufferSize"),
    clientId)

  def read(): Iterable[String] = {

    //println(simpleConsumer.toString)
    val fetchRequest = new FetchRequestBuilder().clientId(clientId)
    for(topic <- topics) {
      fetchRequest.addFetch(topic, partition, offset, fetchSize)
    }

    val fetchResponse = simpleConsumer.fetch(fetchRequest.build())

    fetchResponse.data.values.flatMap { topic =>
      topic.messages.toList.map { mao =>
        val payload =  mao.message.payload

        //ugliest part of the code. Thanks to kafka
        val data = Array.fill[Byte](payload.limit)(0)
        payload.get(data)
        new String(data)
      }
    }
  }
} 
开发者ID:alonsoir,项目名称:hello-kafka-twitter-scala,代码行数:39,代码来源:ChunkConsumer.scala


示例10: ZookeeperOffsetMetadataManagerFactory

//设置package包名称以及导入依赖的类
package com.box.castle.consumer.offsetmetadatamanager

import java.nio.file.Path
import java.util.concurrent.TimeUnit._
import kafka.consumer.SimpleConsumer
import org.apache.curator.RetryPolicy
import org.apache.curator.retry.ExponentialBackoffRetry

import scala.concurrent.duration.{FiniteDuration, Duration}

class ZookeeperOffsetMetadataManagerFactory(rootNameSpace: Path,
                                            connectionEndpoint: String,
                                            retryPolicy: RetryPolicy = new ExponentialBackoffRetry(1500, 3),
                                            initialConnectionTimeout: Duration = FiniteDuration(10, SECONDS),
                                            connectionTimeout: Duration = FiniteDuration(10, SECONDS),
                                            sessionTimeout: Duration = FiniteDuration(30, SECONDS))
  extends OffsetMetadataManagerFactory {

  def create(consumer: SimpleConsumer): ZookeeperOffsetMetadataManager = {
    new ZookeeperOffsetMetadataManager(rootNameSpace, connectionEndpoint, retryPolicy,
      initialConnectionTimeout, connectionTimeout, sessionTimeout)
  }
} 
开发者ID:Box-Castle,项目名称:router,代码行数:24,代码来源:ZookeeperOffsetMetadataManagerFactory.scala


示例11: RunningEmbeddedKafka

//设置package包名称以及导入依赖的类
package com.pygmalios.sparkCheckpointExperience.kafka.embedded

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.Logging

import scala.concurrent.duration._

class RunningEmbeddedKafka(producer: KafkaProducer[String, String],
                           consumer: SimpleConsumer,
                           topic: String) extends Logging {
  val topicAndPartition = TopicAndPartition(topic, 0)

  def publish(topic: String, message: String): Unit = publish(topic, null, message)

  def publish(topic: String, key: String, message: String): Unit = {
    producer.send(new ProducerRecord(topic, key, message)).get(3, SECONDS)

    val latest    = getOffset(OffsetRequest.LatestTime)
    val earliest  = getOffset(OffsetRequest.EarliestTime)
    log.info(f"$topic [$earliest%3d:$latest%3d]: $key%3s -> $message%3s")
  }

  private def getOffset(time: Long): Long = {
    val response = consumer.getOffsetsBefore(OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, 100))))
    response.partitionErrorAndOffsets(topicAndPartition).offsets.head
  }
} 
开发者ID:pygmalios,项目名称:spark-checkpoint-experience,代码行数:31,代码来源:RunningEmbeddedKafka.scala



注:本文中的kafka.consumer.SimpleConsumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Inet4Address类代码示例发布时间:2022-05-23
下一篇:
Scala IDF类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap