本文整理汇总了Java中kafka.utils.ZKGroupTopicDirs类的典型用法代码示例。如果您正苦于以下问题:Java ZKGroupTopicDirs类的具体用法?Java ZKGroupTopicDirs怎么用?Java ZKGroupTopicDirs使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ZKGroupTopicDirs类属于kafka.utils包,在下文中一共展示了ZKGroupTopicDirs类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getZookeeperOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client,
String topicStr) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
List<String> partitions = asJavaListConverter(
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
for (String partition : partitions) {
TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
Option<String> data = client.readDataMaybeNull(
topicDirs.consumerOffsetDir() + "/" + partition)._1();
if (data.isDefined()) {
Long offset = Long.valueOf(data.get());
offsets.put(key, new OffsetAndMetadata(offset));
}
}
return offsets;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:18,代码来源:KafkaSource.java
示例2: getZookeeperOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
List<String> partitions = asJavaListConverter(
client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
for (String partition : partitions) {
TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
Option<String> data = client.readDataMaybeNull(
topicDirs.consumerOffsetDir() + "/" + partition)._1();
if (data.isDefined()) {
Long offset = Long.valueOf(data.get());
offsets.put(key, new OffsetAndMetadata(offset));
}
}
return offsets;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:17,代码来源:KafkaChannel.java
示例3: getOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
/**
* @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
* @param groupID consumer group to get offsets for
* @param topic topic to get offsets for
* @return mapping of (topic and) partition to offset
*/
public static Map<Pair<String,Integer>,Long> getOffsets(String zkServers,
String groupID,
String topic) {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
Map<Pair<String,Integer>,Long> offsets = new HashMap<>();
ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
try {
List<?> partitions = JavaConversions.seqAsJavaList(
zkUtils.getPartitionsForTopics(
JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
partitions.forEach(partition -> {
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
Option<String> maybeOffset = zkUtils.readDataMaybeNull(partitionOffsetPath)._1();
Long offset = maybeOffset.isDefined() ? Long.valueOf(maybeOffset.get()) : null;
offsets.put(new Pair<>(topic, Integer.valueOf(partition.toString())), offset);
});
} finally {
zkUtils.close();
}
return offsets;
}
开发者ID:oncewang,项目名称:oryx2,代码行数:28,代码来源:KafkaUtils.java
示例4: setOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
/**
* @param zkServers Zookeeper server string: host1:port1[,host2:port2,...]
* @param groupID consumer group to update
* @param offsets mapping of (topic and) partition to offset to push to Zookeeper
*/
@SuppressWarnings("deprecation")
public static void setOffsets(String zkServers,
String groupID,
Map<Pair<String,Integer>,Long> offsets) {
ZkUtils zkUtils = ZkUtils.apply(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
try {
offsets.forEach((topicAndPartition, offset) -> {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.getFirst());
int partition = topicAndPartition.getSecond();
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
// TODO replace call below with defaultAcls(false, "") when < 0.10.2 is supported
zkUtils.updatePersistentPath(partitionOffsetPath,
Long.toString(offset),
ZkUtils$.MODULE$.DefaultAcls(false));
});
} finally {
zkUtils.close();
}
}
开发者ID:oncewang,项目名称:oryx2,代码行数:25,代码来源:KafkaUtils.java
示例5: getOffsetFromZooKeeper
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:ZookeeperOffsetHandler.java
示例6: getOffsetFromZooKeeper
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = curatorClient.getData().forPath(path);
if (data == null) {
return null;
} else {
String asString = new String(data);
if (asString.length() == 0) {
return null;
} else {
try {
return Long.valueOf(asString);
}
catch (NumberFormatException e) {
LOG.error(
"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
groupId, topic, partition, asString);
return null;
}
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:ZookeeperOffsetHandler.java
示例7: setOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsets(String zkOffsetManagement,
String groupID,
Map<TopicAndPartition, Long> offsets) {
try (AutoZkClient zkClient = new AutoZkClient(zkOffsetManagement)) {
offsets.forEach((topicAndPartition, offset) -> {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topicAndPartition.topic());
int partition = topicAndPartition.partition();
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
ZkUtils.updatePersistentPath(zkClient, partitionOffsetPath, Long.toString(offset));
log.info("updating offset path" + partitionOffsetPath + " offset=" + Long.toString(offset));
});
}
}
开发者ID:ameyamk,项目名称:spark-streaming-direct-kafka,代码行数:14,代码来源:KafkaManager.java
示例8: getOffsets
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static Map<TopicAndPartition, Long> getOffsets(String zkKafkaServers,
String zkOffSetManager,
String groupID,
String topic,
Map<String, String> kafkaParams) {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
Map<TopicAndPartition, Long> offsets = new HashMap<>();
AutoZkClient zkKafkaClient = new AutoZkClient(zkKafkaServers);
AutoZkClient zkOffsetManagerClient = new AutoZkClient(zkOffSetManager);
List<?> partitions = JavaConversions.seqAsJavaList(
ZkUtils.getPartitionsForTopics(
zkKafkaClient,
JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
partitions.forEach(partition -> {
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
log.info("Offset location, zookeeper path=" + partitionOffsetPath);
Option<String> maybeOffset = ZkUtils.readDataMaybeNull(zkOffsetManagerClient, partitionOffsetPath)._1();
Long offset = maybeOffset.isDefined() ? Long.parseLong(maybeOffset.get()) : null;
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, Integer.parseInt(partition.toString()));
offsets.put(topicAndPartition, offset);
});
fillInLatestOffsets(offsets, kafkaParams); // in case offsets are blank for any partition
return offsets;
}
开发者ID:ameyamk,项目名称:spark-streaming-direct-kafka,代码行数:28,代码来源:KafkaManager.java
示例9: setOffsetInZooKeeper
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:ZookeeperOffsetHandler.java
示例10: setOffsetInZooKeeper
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
byte[] data = Long.toString(offset).getBytes();
curatorClient.setData().forPath(path, data);
}
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:ZookeeperOffsetHandler.java
示例11: getConsumerPartitionStream
import kafka.utils.ZKGroupTopicDirs; //导入依赖的package包/类
private Stream<ConsumerPartitionVO> getConsumerPartitionStream(String groupId,
String topicName,
TopicVO topicOpt)
{
ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);
if (topicOpt == null || topicOpt.getName().equals(topicName))
{
topicOpt = getTopic(topicName).orElse(null);
}
if (topicOpt != null)
{
final TopicVO topic = topicOpt;
Map<Integer, Long> consumerOffsets = getConsumerOffsets(groupId, topic);
return topic.getPartitions().stream()
.map(partition -> {
int partitionId = partition.getId();
final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
consumerPartition.setOwner(
Optional.ofNullable(
consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
.map(data -> new String(data.getData()))
.orElse(null));
consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));
final Optional<TopicPartitionVO> topicPartition = topic.getPartition(partitionId);
consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
return consumerPartition;
});
}
else
{
return Stream.empty();
}
}
开发者ID:HomeAdvisor,项目名称:Kafdrop,代码行数:43,代码来源:CuratorKafkaMonitor.java
注:本文中的kafka.utils.ZKGroupTopicDirs类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论