• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java ZKGroupTopicDirs类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中kafka.utils.ZKGroupTopicDirs的典型用法代码示例。如果您正苦于以下问题:Java ZKGroupTopicDirs类的具体用法?Java ZKGroupTopicDirs怎么用?Java ZKGroupTopicDirs使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ZKGroupTopicDirs类属于kafka.utils包,在下文中一共展示了ZKGroupTopicDirs类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: getZookeeperOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client,
                                                                   String topicStr) {
  Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
  List<String> partitions = asJavaListConverter(
      client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
  for (String partition : partitions) {
    TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
    Option<String> data = client.readDataMaybeNull(
        topicDirs.consumerOffsetDir() + "/" + partition)._1();
    if (data.isDefined()) {
      Long offset = Long.valueOf(data.get());
      offsets.put(key, new OffsetAndMetadata(offset));
    }
  }
  return offsets;
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:18,代码来源:KafkaSource.java


示例2: getZookeeperOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
  Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
  List<String> partitions = asJavaListConverter(
      client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
  for (String partition : partitions) {
    TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
    Option<String> data = client.readDataMaybeNull(
        topicDirs.consumerOffsetDir() + "/" + partition)._1();
    if (data.isDefined()) {
      Long offset = Long.valueOf(data.get());
      offsets.put(key, new OffsetAndMetadata(offset));
    }
  }
  return offsets;
}
 
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:17,代码来源:KafkaChannel.java


示例3: getOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param groupID consumer group to get offsets for
 * @param topic topic to get offsets for
 * @return mapping of (topic and) partition to offset
 */
public static Map<Pair<String,Integer>,Long> getOffsets(String zkServers,
                                                        String groupID,
                                                        String topic) {
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
  Map<Pair<String,Integer>,Long> offsets = new HashMap<>();
  ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
  try {
    List<?> partitions = JavaConversions.seqAsJavaList(
        zkUtils.getPartitionsForTopics(
          JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
    partitions.forEach(partition -> {
      String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
      Option<String> maybeOffset = zkUtils.readDataMaybeNull(partitionOffsetPath)._1();
      Long offset = maybeOffset.isDefined() ? Long.valueOf(maybeOffset.get()) : null;
      offsets.put(new Pair<>(topic, Integer.valueOf(partition.toString())), offset);
    });
  } finally {
    zkUtils.close();
  }
  return offsets;
}
 
开发者ID:oncewang,项目名称:oryx2,代码行数:28,代码来源:KafkaUtils.java


示例4: setOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
/**
 * @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
 * @param groupID consumer group to update
 * @param offsets mapping of (topic and) partition to offset to push to Zookeeper
 */
@SuppressWarnings("deprecation")
public static void setOffsets(String zkServers,
                              String groupID,
                              Map<Pair<String,Integer>,Long> offsets) {
  ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
  try {
    offsets.forEach((topicAndPartition, offset) -> {
      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.getFirst());
      int partition = topicAndPartition.getSecond();
      String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
      // TODO replace call below with defaultAcls(false, "") when < 0.10.2 is supported
      zkUtils.updatePersistentPath(partitionOffsetPath,
                                   Long.toString(offset),
                                   ZkUtils$.MODULE$.DefaultAcls(false));
    });
  } finally {
    zkUtils.close();
  }
}
 
开发者ID:oncewang,项目名称:oryx2,代码行数:25,代码来源:KafkaUtils.java


示例5: getOffsetFromZooKeeper

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());

	byte[] data = curatorClient.getData().forPath(path);

	if (data == null) {
		return null;
	} else {
		String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
		if (asString.length() == 0) {
			return null;
		} else {
			try {
				return Long.valueOf(asString);
			}
			catch (NumberFormatException e) {
				LOG.error(
						"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
					groupId, topic, partition, asString);
				return null;
			}
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:ZookeeperOffsetHandler.java


示例6: getOffsetFromZooKeeper

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
	
	byte[] data = curatorClient.getData().forPath(path);
	
	if (data == null) {
		return null;
	} else {
		String asString = new String(data);
		if (asString.length() == 0) {
			return null;
		} else {
			try {
				return Long.valueOf(asString);
			}
			catch (NumberFormatException e) {
				LOG.error(
						"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
					groupId, topic, partition, asString);
				return null;
			}
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:ZookeeperOffsetHandler.java


示例7: setOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsets(String zkOffsetManagement,
                              String groupID,
                              Map<TopicAndPartition, Long> offsets) {
    try (AutoZkClient zkClient = new AutoZkClient(zkOffsetManagement)) {
        offsets.forEach((topicAndPartition, offset) -> {
            ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
            int partition = topicAndPartition.partition();
            String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
            ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
            log.info("updating offset path" + partitionOffsetPath + " offset=" + Long.toString(offset));
        });
    }
}
 
开发者ID:ameyamk,项目名称:spark-streaming-direct-kafka,代码行数:14,代码来源:KafkaManager.java


示例8: getOffsets

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Map<TopicAndPartition, Long> getOffsets(String zkKafkaServers,
                                                      String zkOffSetManager,
                                                      String groupID,
                                                      String topic,
                                                      Map<String, String> kafkaParams) {
    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
    Map<TopicAndPartition, Long> offsets = new HashMap<>();

    AutoZkClient zkKafkaClient = new AutoZkClient(zkKafkaServers);
    AutoZkClient zkOffsetManagerClient = new AutoZkClient(zkOffSetManager);

    List<?> partitions = JavaConversions.seqAsJavaList(
            ZkUtils.getPartitionsForTopics(
                    zkKafkaClient,
                    JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
    partitions.forEach(partition -> {
        String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
        log.info("Offset location, zookeeper path=" + partitionOffsetPath);
        Option<String> maybeOffset = ZkUtils.readDataMaybeNull(zkOffsetManagerClient, partitionOffsetPath)._1();
        Long offset = maybeOffset.isDefined() ? Long.parseLong(maybeOffset.get()) : null;
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, Integer.parseInt(partition.toString()));
        offsets.put(topicAndPartition, offset);
    });

    fillInLatestOffsets(offsets, kafkaParams); // in case offsets are blank for any partition
    return offsets;
}
 
开发者ID:ameyamk,项目名称:spark-streaming-direct-kafka,代码行数:28,代码来源:KafkaManager.java


示例9: setOffsetInZooKeeper

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
	byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
	curatorClient.setData().forPath(path, data);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:ZookeeperOffsetHandler.java


示例10: setOffsetInZooKeeper

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
	ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
	String path = topicDirs.consumerOffsetDir() + "/" + partition;
	curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
	byte[] data = Long.toString(offset).getBytes();
	curatorClient.setData().forPath(path, data);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:ZookeeperOffsetHandler.java


示例11: getConsumerPartitionStream

import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Stream<ConsumerPartitionVO> getConsumerPartitionStream(String groupId,
                                                               String topicName,
                                                               TopicVO topicOpt)
{
   ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);

   if (topicOpt == null || topicOpt.getName().equals(topicName))
   {
      topicOpt = getTopic(topicName).orElse(null);
   }

   if (topicOpt != null)
   {
      final TopicVO topic = topicOpt;

      Map<Integer, Long> consumerOffsets = getConsumerOffsets(groupId, topic);

      return topic.getPartitions().stream()
         .map(partition -> {
            int partitionId = partition.getId();

            final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
            consumerPartition.setOwner(
               Optional.ofNullable(
                  consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
                  .map(data -> new String(data.getData()))
                  .orElse(null));

            consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));

            final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
            consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
            consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));

            return consumerPartition;
         });
   }
   else
   {
      return Stream.empty();
   }
}
 
开发者ID:HomeAdvisor,项目名称:Kafdrop,代码行数:43,代码来源:CuratorKafkaMonitor.java



注:本文中的kafka.utils.ZKGroupTopicDirs类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java UpdateNodeResourceRequest类代码示例发布时间:2022-05-22
下一篇:
Java FindTicketsInfo类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap