• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java TopicMetadata类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中kafka.api.TopicMetadata的典型用法代码示例。如果您正苦于以下问题:Java TopicMetadata类的具体用法?Java TopicMetadata怎么用?Java TopicMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TopicMetadata类属于kafka.api包,在下文中一共展示了TopicMetadata类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testClearStream

import kafka.api.TopicMetadata; //导入依赖的package包/类
@Test
public void testClearStream() {
  KafkaSystemAdmin admin = this.basicSystemAdmin;
  StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);

  assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
  assertTrue(admin.clearStream(spec));

  scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
  scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic);
  assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
}
 
开发者ID:apache,项目名称:samza,代码行数:13,代码来源:TestKafkaSystemAdminJava.java


示例2: send

import kafka.api.TopicMetadata; //导入依赖的package包/类
@Override
public TopicMetadataResponse send(TopicMetadataRequest request) {
  java.util.List<String> topics = request.topics();
  TopicMetadata[] topicMetadataArray = new TopicMetadata[topics.size()];

  for (int i = 0; i < topicMetadataArray.length; i++) {
    String topic = topics.get(i);
    if (!topic.equals(topicName)) {
      topicMetadataArray[i] = new TopicMetadata(topic, null, Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
    } else {
      PartitionMetadata[] partitionMetadataArray = new PartitionMetadata[partitionCount];
      for (int j = 0; j < partitionCount; j++) {
        java.util.List<BrokerEndPoint> emptyJavaList = Collections.emptyList();
        List<BrokerEndPoint> emptyScalaList = JavaConversions.asScalaBuffer(emptyJavaList).toList();
        partitionMetadataArray[j] = new PartitionMetadata(j, Some.apply(brokerArray[partitionLeaderIndices[j]]),
            emptyScalaList, emptyScalaList, Errors.NONE.code());
      }

      Seq<PartitionMetadata> partitionsMetadata = List.fromArray(partitionMetadataArray);
      topicMetadataArray[i] = new TopicMetadata(topic, partitionsMetadata, Errors.NONE.code());
    }
  }

  Seq<BrokerEndPoint> brokers = List.fromArray(brokerArray);
  Seq<TopicMetadata> topicsMetadata = List.fromArray(topicMetadataArray);

  return new TopicMetadataResponse(new kafka.api.TopicMetadataResponse(brokers, topicsMetadata, -1));
}
 
开发者ID:linkedin,项目名称:pinot,代码行数:29,代码来源:SimpleConsumerWrapperTest.java


示例3: updateProducer

import kafka.api.TopicMetadata; //导入依赖的package包/类
public void updateProducer(Collection<TopicMetadata> topicMetadata) {
    final Set<Broker> newBrokers = Sets.newHashSet();
    Utils.foreach(topicMetadata, new Callable1<TopicMetadata>() {
        @Override
        public void apply(TopicMetadata tmd) {
            Utils.foreach(tmd.partitionsMetadata, new Callable1<PartitionMetadata>() {
                @Override
                public void apply(PartitionMetadata pmd) {
                    if(pmd.leader != null)
                        newBrokers.add(pmd.leader);
                }
            });
        }
    });
    synchronized(lock)  {
        Utils.foreach(newBrokers, new Callable1<Broker>() {
            @Override
            public void apply(Broker b) {
                if(syncProducers.containsKey(b.id)){
                    syncProducers.get(b.id).close();
                    syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b));
                } else
                    syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b));
            }
        });
    }
}
 
开发者ID:bingoohuang,项目名称:buka,代码行数:28,代码来源:ProducerPool.java


示例4: fetchTopicMetadataFromZk

import kafka.api.TopicMetadata; //导入依赖的package包/类
public static Set<TopicMetadata> fetchTopicMetadataFromZk(Set<String> topics, final ZkClient zkClient) {
    final Map<Integer, Broker> cachedBrokerInfo = Maps.newHashMap();

    return Utils.mapSet(topics, new Function1<String, TopicMetadata>() {
        @Override
        public TopicMetadata apply(String topic) {
            return fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo);
        }
    });
}
 
开发者ID:bingoohuang,项目名称:buka,代码行数:11,代码来源:AdminUtils.java


示例5: testTopicPartitionCreationCount

import kafka.api.TopicMetadata; //导入依赖的package包/类
@Test
public void testTopicPartitionCreationCount()
    throws IOException, InterruptedException {
  String topic = "topicPartition4";
  int clusterCount = _kafkaTestHelper.getClusterCount();
  int partionCount = clusterCount/2;
  int zkPort = _kafkaTestHelper.getZookeeperPort();
  Properties props = new Properties();
  
  //	Setting Topic Properties
  props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
  props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount));
  props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT,  String.valueOf(partionCount));
  props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:"+zkPort);
  System.out.println(_kafkaTestHelper.getBootServersList());
  
  // Setting Producer Properties
  props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", _kafkaTestHelper.getBootServersList());    
  props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  
  Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
  String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
  int sessionTimeoutMs = 10 * 1000;
  int connectionTimeoutMs = 8 * 1000;
  // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
  // createTopic() will only seem to work (it will return without error).  The topic will exist in
  // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
  // topic.
  ZkClient zkClient = new ZkClient(
      zookeeperConnect,
      sessionTimeoutMs,
      connectionTimeoutMs,
      ZKStringSerializer$.MODULE$);
  boolean isSecureKafkaCluster = false;
  ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
  
  TopicMetadata metaData =
  		AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
  Assert.assertEquals(metaData.partitionsMetadata().size(), partionCount);

}
 
开发者ID:apache,项目名称:incubator-gobblin,代码行数:42,代码来源:Kafka09TopicProvisionTest.java


示例6: testLiveTopicPartitionCreationCount

import kafka.api.TopicMetadata; //导入依赖的package包/类
@Test
 public void testLiveTopicPartitionCreationCount()
     throws IOException, InterruptedException {
String liveClusterCount = System.getProperty("live.cluster.count");
String liveZookeeper = System.getProperty("live.zookeeper");
String liveBroker = System.getProperty("live.broker");
String topic = System.getProperty("live.newtopic");
String topicReplicationCount = System.getProperty("live.newtopic.replicationCount");
String topicPartitionCount = System.getProperty("live.newtopic.partitionCount");
if(StringUtils.isEmpty(liveClusterCount)){
	Assert.assertTrue(true);
	return;
}
if(StringUtils.isEmpty(topicPartitionCount)){
	int clusterCount = Integer.parseInt(liveClusterCount);
	clusterCount--;
	int partionCount = clusterCount/2;
	topicReplicationCount = String.valueOf(clusterCount);
	topicPartitionCount = String.valueOf(partionCount);
}

   Properties props = new Properties();
   //	Setting Topic Properties
   props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
   props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
   props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount );
   props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
   // Setting Producer Properties
   props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", liveBroker);    
   props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   
   Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
   int sessionTimeoutMs = 10 * 1000;
   int connectionTimeoutMs = 8 * 1000;
   // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
   // createTopic() will only seem to work (it will return without error).  The topic will exist in
   // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
   // topic.
   ZkClient zkClient = new ZkClient(
   	liveZookeeper,
       sessionTimeoutMs,
       connectionTimeoutMs,
       ZKStringSerializer$.MODULE$);
   boolean isSecureKafkaCluster = false;
   ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
   
   TopicMetadata metaData =
   		AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
   Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount));

 }
 
开发者ID:apache,项目名称:incubator-gobblin,代码行数:52,代码来源:Kafka09TopicProvisionTest.java


示例7: KafkaSplitSource

import kafka.api.TopicMetadata; //导入依赖的package包/类
KafkaSplitSource(String connectorId, Table table,
        Iterable<Partition> hivePartitions,
        KafkaClientConfig kafkaConfig)
{
    this.connectorId = connectorId;
    this.fetchedIndex = 0;
    this.computedSplits = new ArrayList<Split>();
    String zookeeper = kafkaConfig.getZookeeper();
    int zkSessionTimeout = kafkaConfig.getZookeeperSessionTimeout();
    int zkConnectionTimeout = kafkaConfig.getZookeeperConnectTimeout();

    Map<String, String> tblProps = table.getParameters();
    String tableTopic = tblProps.get(KafkaTableProperties.kafkaTopicName);

    long splitRange = getDefault(tblProps, KafkaTableProperties.kafkaSplitRange, 60 * 60 * 1000);
    long scanRange = getDefault(tblProps, KafkaTableProperties.kafkaJobRange, 24 * 60 * 60 * 1000);
    int sampleRate = (int) getDefault(tblProps, KafkaTableProperties.kafkaTableSampleRate, 100);

    ZkClient zkclient = new ZkClient(zookeeper, zkSessionTimeout,
            zkConnectionTimeout, new ZKStringSerializer());

    TopicMetadata metadata = AdminUtils.fetchTopicMetadataFromZk(tableTopic, zkclient);
    List<PartitionMetadata> mds = scala.collection.JavaConversions.asJavaList(metadata.partitionsMetadata());

    List<long[]> offsetList = null;
    // if the table is partitioned, look at each partition and
    // determine the data to look at.
    List<FieldSchema> partCols = table.getPartitionKeys();
    if (partCols != null && partCols.size() > 0)
    {
        offsetList = generateTsOffsetsFromPartitions(hivePartitions, tblProps, splitRange, partCols);
    } else
    {
        // we will set the table property so that all the the queries hit here.
        offsetList = generateTsOffsetsNoPartitions(scanRange, mds.size());
    }

    for (PartitionMetadata md : mds)
    {
        Broker broker = md.leader().get();
        for (long[] offsets : offsetList)
        {
            long startTs = offsets[0];
            long endTs = offsets[1];
            KafkaSplit split = new KafkaSplit(connectorId,
                    tableTopic, md.partitionId(),
                    broker.host(), broker.port(),
                    sampleRate,
                    startTs, endTs, zookeeper,
                    zkSessionTimeout, zkConnectionTimeout);
            this.computedSplits.add(split);
        }
    }
}
 
开发者ID:dropbox,项目名称:presto-kafka-connector,代码行数:55,代码来源:KafkaSplitSourceProvider.java



注:本文中的kafka.api.TopicMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Peer类代码示例发布时间:2022-05-23
下一篇:
Java RexFieldCollation类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap