本文整理汇总了Java中kafka.javaapi.OffsetRequest类的典型用法代码示例。如果您正苦于以下问题:Java OffsetRequest类的具体用法?Java OffsetRequest怎么用?Java OffsetRequest使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
OffsetRequest类属于kafka.javaapi包,在下文中一共展示了OffsetRequest类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private static OffsetInfo getOffset(String topic, PartitionMetadata partition) {
Broker broker = partition.leader();
SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 10000, 1000000,
"com.rekko.newrelic.storm.kafka");
try {
TopicAndPartition
topicAndPartition =
new TopicAndPartition(topic, partition.partitionId());
PartitionOffsetRequestInfo rquest = new PartitionOffsetRequestInfo(-1, 1);
Map<TopicAndPartition, PartitionOffsetRequestInfo>
map =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
map.put(topicAndPartition, rquest);
OffsetRequest req = new OffsetRequest(map, (short) 0, "com.rekko.newrelic.storm.kafka");
OffsetResponse resp = consumer.getOffsetsBefore(req);
OffsetInfo offset = new OffsetInfo();
offset.offset = resp.offsets(topic, partition.partitionId())[0];
return offset;
} finally {
consumer.close();
}
}
开发者ID:ghais,项目名称:newrelic_storm_kafka,代码行数:24,代码来源:Kafka.java
示例2: getLastOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String
clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,
PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic,
partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
开发者ID:wngn123,项目名称:wngn-jms-kafka,代码行数:17,代码来源:SimpleConsumerExample.java
示例3: getLastOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
/**
* @param consumer
* @param topic
* @param partition
* @param whichTime
* @param clientName
* @return 0 if consumer is null at this time
*/
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
{
if (consumer == null) {
return 0;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
开发者ID:apache,项目名称:apex-malhar,代码行数:27,代码来源:KafkaMetadataUtil.java
示例4: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
public long getOffset(String topic, int partition, long startOffsetTime) {
SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
if (simpleConsumer == null) {
LOG.error("Error consumer is null get offset from partition:" + partition);
return -1;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
if (offsets.length > 0) {
return offsets[0];
} else {
return NO_OFFSET;
}
}
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:21,代码来源:KafkaConsumer.java
示例5: fetchResetOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
public long fetchResetOffset(String reset){
long time = LatestTime();
if (reset != null && reset.equals(SmallestTimeString()))
time = EarliestTime();
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
TopicAndPartition tp = new TopicAndPartition(topic, partition);
PartitionOffsetRequestInfo info = new PartitionOffsetRequestInfo(time,1);
requestInfo.put(tp, info);
OffsetRequest request = new OffsetRequest(requestInfo,CurrentVersion(), clientId);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
//ErrorMapping.exceptionFor(response.errorCode(topic, partition)).printStackTrace();
throw new KafkaPartitionReaderException(response.errorCode(topic, partition));
}
long[] offsets = response.offsets(topic, partition);
//TODO: confirm with xiaoju why we need this check?
// if (offsets.length <= 0)
// continue;
return offsets[0];
}
开发者ID:pulsarIO,项目名称:druid-kafka-ext,代码行数:21,代码来源:ConsumerPartitionReader.java
示例6: findAllOffsets
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private static long[] findAllOffsets(SimpleConsumer consumer, String topicName, int partitionId)
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
// The API implies that this will always return all of the offsets. So it seems a partition can not have
// more than Integer.MAX_VALUE-1 segments.
//
// This also assumes that the lowest value returned will be the first segment available. So if segments have been dropped off, this value
// should not be 0.
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE);
OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest);
if (offsetResponse.hasError()) {
short errorCode = offsetResponse.errorCode(topicName, partitionId);
log.warn("Offset response has error: %d", errorCode);
throw new PrestoException(KAFKA_SPLIT_ERROR, "could not fetch data from Kafka, error code is '" + errorCode + "'");
}
return offsetResponse.offsets(topicName, partitionId);
}
开发者ID:y-lan,项目名称:presto,代码行数:22,代码来源:KafkaSplitManager.java
示例7: getEarliestOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
@Override
public long getEarliestOffset() {
if (this.earliestOffset == -2 && uri != null) {
// TODO : Make the hardcoded paramters configurable
SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000,
1024 * 1024, "hadoop-etl");
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.EarliestTime(), 1));
OffsetResponse response = consumer
.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest
.CurrentVersion(), "hadoop-etl"));
long[] endOffset = response.offsets(topic, partition);
consumer.close();
this.earliestOffset = endOffset[0];
return endOffset[0];
} else {
return this.earliestOffset;
}
}
开发者ID:HiveKa,项目名称:HiveKa,代码行数:21,代码来源:KafkaRequest.java
示例8: getLastOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
@Override
public long getLastOffset(long time) {
SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000,
1024 * 1024, "hadoop-etl");
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(
time, 1));
OffsetResponse response = consumer.getOffsetsBefore(new OffsetRequest(offsetInfo,
kafka.api.OffsetRequest.CurrentVersion(),"hadoop-etl"));
long[] endOffset = response.offsets(topic, partition);
consumer.close();
if(endOffset.length == 0)
{
log.info("The exception is thrown because the latest offset retunred zero for topic : " + topic + " and partition " + partition);
}
this.latestOffset = endOffset[0];
return endOffset[0];
}
开发者ID:HiveKa,项目名称:HiveKa,代码行数:19,代码来源:KafkaRequest.java
示例9: findLastOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private long findLastOffset(TopicPartition topicPartition, SimpleConsumer consumer) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.getTopic(),
topicPartition.getPartition());
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.LatestTime(), 1));
final String clientName = getClientName(topicPartition);
OffsetRequest request = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
throw new RuntimeException("Error fetching offset data. Reason: " +
response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()));
}
long[] offsets = response.offsets(topicPartition.getTopic(),
topicPartition.getPartition());
return offsets[0] - 1;
}
开发者ID:pinterest,项目名称:secor,代码行数:22,代码来源:KafkaClient.java
示例10: getEarliestOffsetFromKafka
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private long getEarliestOffsetFromKafka(String topic, int partition, long startTime) {
LOG.info("getEarliestOffsetFromKafka.");
TopicAndPartition tp = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(tp, new PartitionOffsetRequestInfo(startTime, 1));
OffsetRequest req = new OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), getClientName());
OffsetResponse resp = consumer.getOffsetsBefore(req);
if (resp.hasError()) {
LOG.error("error when fetching offset: " + resp.errorCode(topic, partition)); //xxx
return 0;
}
LOG.info("Earliest offset " + resp.offsets(topic, partition)[0]);
return resp.offsets(topic, partition)[0];
}
开发者ID:DemandCube,项目名称:Scribengin,代码行数:22,代码来源:ScribeConsumer.java
示例11: getLastOffSet
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
/**
* 获取当前groupID对应的consumer在对应的topic和partition中对应的offset偏移量
*
* @param consumer 消费者
* @param groupId 消费者分区id
* @param topic 所属的Topic
* @param partitionID 所属的分区ID
* @param whichTime 用于判断,当consumer从没有消费数据的时候,从当前topic的Partition的那个offset开始读取数据
* @param clientName client名称
*
* @return 正常情况下,返回非负数,当出现异常的时候,返回-1
*/
public long getLastOffSet(SimpleConsumer consumer, String groupId, String topic, int partitionID, long whichTime,
String clientName) {
// 1. 从ZK中获取偏移量,当zk的返回偏移量大于0的时候,表示是一个正常的偏移量
long offset = this.getOffsetOfTopicAndPartition(consumer, groupId, clientName, topic, partitionID);
if (offset > 0) {
return offset;
}
// 2. 获取当前topic当前分区的数据偏移量
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition,
PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic,
partitionID));
return -1;
}
// 获取偏移量
long[] offsets = response.offsets(topic, partitionID);
return offsets[0];
}
开发者ID:wngn123,项目名称:wngn-jms-kafka,代码行数:40,代码来源:JavaKafkaSimpleConsumerAPI.java
示例12: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
if (config.forceFromStart) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
}
开发者ID:redBorder,项目名称:rb-bi,代码行数:8,代码来源:KafkaUtils.java
示例13: getLastOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
/**
* Retrieves the last offset before the given timestamp for a given topic partition.
*
* @return The last offset before the given timestamp or {@code 0} if failed to do so.
*/
private long getLastOffset(TopicPartition topicPart, long timestamp) {
BrokerInfo brokerInfo = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition());
SimpleConsumer consumer = brokerInfo == null ? null : consumers.getUnchecked(brokerInfo);
// If no broker, treat it as failure attempt.
if (consumer == null) {
LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", topicPart);
return 0L;
}
// Fire offset request
OffsetRequest request = new OffsetRequest(ImmutableMap.of(
new TopicAndPartition(topicPart.getTopic(), topicPart.getPartition()),
new PartitionOffsetRequestInfo(timestamp, 1)
), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
// Retrieve offsets from response
long[] offsets = response.hasError() ? null : response.offsets(topicPart.getTopic(), topicPart.getPartition());
if (offsets == null || offsets.length <= 0) {
short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
// If the topic partition doesn't exists, use offset 0 without logging error.
if (errorCode != ErrorMapping.UnknownTopicOrPartitionCode()) {
consumers.refresh(brokerInfo);
LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.",
topicPart, timestamp, errorCode);
}
return 0L;
}
LOG.debug("Offset {} fetched for {} with timestamp {}.", offsets[0], topicPart, timestamp);
return offsets[0];
}
开发者ID:apache,项目名称:twill,代码行数:41,代码来源:SimpleKafkaConsumer.java
示例14: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private OffsetResponse getOffset(final long whichTime) {
verifyBroker();
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
final OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
return offsetResponseRetryer.retryInfinitely(new Callable<OffsetResponse>() {
@Override
public OffsetResponse call() throws Exception {
try {
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response != null) {
long[] offsets = response.offsets(topic, partitionId);
if (offsets != null && offsets.length > 0)
return response;
}
LOG.warn("Error fetching offset data: {}-[{}:{}] Error code: {}", consumerGroup, topic, partitionId, response != null ? response.errorCode(topic, partitionId) : -1);
} catch (Exception e) {
LOG.warn("Error fetching offset data: {}-[{}:{}] Exception: {}", consumerGroup, topic, partitionId, e.getMessage(), e);
}
connect(); //always reconnect in case offset cannot be obtained
return null;
}
});
}
开发者ID:jeoffreylim,项目名称:maelstrom,代码行数:30,代码来源:KafkaConsumer.java
示例15: getEarliestOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
@Override
protected long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
return getOffset(partition, offsetRequestInfo);
}
开发者ID:Hanmourang,项目名称:Gobblin,代码行数:8,代码来源:KafkaWrapper.java
示例16: getLatestOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
@Override
protected long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo =
Collections.singletonMap(new TopicAndPartition(partition.getTopicName(), partition.getId()),
new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
return getOffset(partition, offsetRequestInfo);
}
开发者ID:Hanmourang,项目名称:Gobblin,代码行数:8,代码来源:KafkaWrapper.java
示例17: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private long getOffset(KafkaPartition partition,
Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfo)
throws KafkaOffsetRetrievalFailureException {
SimpleConsumer consumer = this.getSimpleConsumer(partition.getLeader().getHostAndPort());
for (int i = 0; i < NUM_TRIES_FETCH_OFFSET; i++) {
try {
OffsetResponse offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(offsetRequestInfo,
kafka.api.OffsetRequest.CurrentVersion(), DEFAULT_KAFKA_CLIENT_NAME));
if (offsetResponse.hasError()) {
throw new RuntimeException(
"offsetReponse has error: " + offsetResponse.errorCode(partition.getTopicName(), partition.getId()));
}
return offsetResponse.offsets(partition.getTopicName(), partition.getId())[0];
} catch (Exception e) {
LOG.warn(
String.format("Fetching offset for partition %s has failed %d time(s). Reason: %s", partition, i + 1, e));
if (i < NUM_TRIES_FETCH_OFFSET - 1) {
try {
Thread.sleep((long) ((i + Math.random()) * 1000));
} catch (InterruptedException e2) {
LOG.error("Caught interrupted exception between retries of getting latest offsets. " + e2);
}
}
}
}
throw new KafkaOffsetRetrievalFailureException(
String.format("Fetching offset for partition %s has failed.", partition));
}
开发者ID:Hanmourang,项目名称:Gobblin,代码行数:29,代码来源:KafkaWrapper.java
示例18: getOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private static long getOffset(SimpleConsumer simpleConsumer, String topic, int partition, long startOffsetTime) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
if (offsets.length > 0) {
return offsets[0];
} else {
return NO_OFFSET;
}
}
开发者ID:linzhaoming,项目名称:easyframe-msg,代码行数:14,代码来源:SimpleKafkaHelper.java
示例19: fetchResetOffset
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
private long fetchResetOffset(String reset) {
long time = LatestTime();
if (reset != null && reset.equals(SmallestTimeString()))
time = EarliestTime();
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
TopicAndPartition tp = new TopicAndPartition(m_topic, m_partition);
PartitionOffsetRequestInfo info = new PartitionOffsetRequestInfo(time,
1);
requestInfo.put(tp, info);
OffsetRequest request = new OffsetRequest(requestInfo,
CurrentVersion(), m_clientId);
for (int i = 0; i < 2; i++) {
for (int j = 0; j < m_retries; j++) {
OffsetResponse response = m_consumer.getOffsetsBefore(request);
if (response.hasError()) {
short errorCode = response.errorCode(m_topic, m_partition);
LOGGER.warn(
"Error when fetch offset from kafka, errorCode="
+ errorCode);
continue;
}
long[] offsets = response.offsets(m_topic, m_partition);
if (offsets.length <= 0)
continue;
return offsets[0];
}
// cannot get offset after retries, reinit and try again
reinit();
}
throw new RuntimeException("Fail to get resetOffset " + reset
+ " after retries for " + m_clientId);
}
开发者ID:pulsarIO,项目名称:jetstream,代码行数:36,代码来源:PartitionReader.java
示例20: matches
import kafka.javaapi.OffsetRequest; //导入依赖的package包/类
@Override
public boolean matches(Object argument) {
if (!(argument instanceof OffsetRequest))
return false;
OffsetRequest req = (OffsetRequest) argument;
PartitionOffsetRequestInfo reqInfo = req.underlying().requestInfo()
.get(tp).get();
if (reqInfo.time() == EarliestTime())
return true;
return false;
}
开发者ID:pulsarIO,项目名称:jetstream,代码行数:12,代码来源:PartitionReaderTest.java
注:本文中的kafka.javaapi.OffsetRequest类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论