本文整理汇总了Java中kafka.common.OffsetMetadataAndError类的典型用法代码示例。如果您正苦于以下问题:Java OffsetMetadataAndError类的具体用法?Java OffsetMetadataAndError怎么用?Java OffsetMetadataAndError使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
OffsetMetadataAndError类属于kafka.common包,在下文中一共展示了OffsetMetadataAndError类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: readOffset
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public long readOffset(String topic, int partition, String groupId) {
BlockingChannel channel = new BlockingChannel(host, port,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
(int) Duration.ofSeconds(10).toMillis());
try {
channel.connect();
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
OffsetFetchRequest offsetFetchRequest = new OffsetFetchRequest(
groupId, singletonList(topicAndPartition), (short) 1, RANDOM.nextInt(), "test-checker");
channel.send(offsetFetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload());
OffsetMetadataAndError result = fetchResponse.offsets().get(topicAndPartition);
return result.offset();
} finally {
channel.disconnect();
}
}
开发者ID:andreschaffer,项目名称:microservices-testing-examples,代码行数:19,代码来源:KafkaOffsets.java
示例2: writeTo
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
@Override
public void writeTo(final ByteBuffer buffer) {
buffer.putInt(correlationId);
buffer.putInt(requestInfoGroupedByTopic.size()); // number of topics
Utils.foreach(requestInfoGroupedByTopic, new Callable2<String, Map<TopicAndPartition,OffsetMetadataAndError>>() {
@Override
public void apply(String topic, Map<TopicAndPartition, OffsetMetadataAndError> arg2) {
writeShortString(buffer, topic); // topic
buffer.putInt(arg2.size()); // number of partitions for this topic
Utils.foreach(arg2, new Callable2<TopicAndPartition, OffsetMetadataAndError>() {
@Override
public void apply(TopicAndPartition _1, OffsetMetadataAndError _2) {
buffer.putInt(_1.partition);
buffer.putLong(_2.offset);
writeShortString(buffer, _2.metadata);
buffer.putShort(_2.error);
}
});
}
});
}
开发者ID:bingoohuang,项目名称:buka,代码行数:24,代码来源:OffsetFetchResponse.java
示例3: OffsetCommitRequest
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public OffsetCommitRequest(String groupId,
Map<TopicAndPartition, OffsetMetadataAndError> requestInfo,
short versionId,
int correlationId,
String clientId) {
super(RequestKeys.OffsetCommitKey, correlationId);
this.groupId = groupId;
this.requestInfo = requestInfo;
this.versionId = versionId;
this.clientId = clientId;
requestInfoGroupedByTopic = Utils.groupBy(requestInfo, new Function2<TopicAndPartition, OffsetMetadataAndError, String>() {
@Override
public String apply(TopicAndPartition arg1, OffsetMetadataAndError arg2) {
return arg1.topic;
}
});
}
开发者ID:bingoohuang,项目名称:buka,代码行数:19,代码来源:OffsetCommitRequest.java
示例4: getOffsetOfTopicAndPartition
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
/**
* 从保存consumer消费者offset偏移量的位置获取当前consumer对应的偏移量
*
* @param consumer 消费者
* @param groupId Group Id
* @param clientName client名称
* @param topic topic名称
* @param partitionID 分区id
*
* @return
*/
public long getOffsetOfTopicAndPartition(SimpleConsumer consumer, String groupId, String clientName, String
topic, int partitionID) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
List<TopicAndPartition> requestInfo = new ArrayList<TopicAndPartition>();
requestInfo.add(topicAndPartition);
OffsetFetchRequest request = new OffsetFetchRequest(groupId, requestInfo, 0, clientName);
OffsetFetchResponse response = consumer.fetchOffsets(request);
// 获取返回值
Map<TopicAndPartition, OffsetMetadataAndError> returnOffsetMetadata = response.offsets();
// 处理返回值
if (returnOffsetMetadata != null && !returnOffsetMetadata.isEmpty()) {
// 获取当前分区对应的偏移量信息
OffsetMetadataAndError offset = returnOffsetMetadata.get(topicAndPartition);
if (offset.error().code() == ErrorMapping.NoError()) {
// 没有异常,表示是正常的,获取偏移量
return offset.offset();
} else {
// 当Consumer第一次连接的时候(zk中不在当前topic对应数据的时候),会产生UnknownTopicOrPartitionCode异常
System.out.println("Error fetching data Offset Data the Topic and Partition. Reason: " + offset.error
());
}
}
// 所有异常情况直接返回0
return 0;
}
开发者ID:wngn123,项目名称:wngn-jms-kafka,代码行数:39,代码来源:JavaKafkaSimpleConsumerAPI.java
示例5: fetchNextOffset
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
private long fetchNextOffset(SimpleConsumer consumer, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
List<TopicAndPartition> requestInfo = new ArrayList<>();
requestInfo.add(topicAndPartition);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupid, requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), correlationId, clientName);
OffsetFetchResponse response = null;
while (true) {
try {
logger.debug("partition {} fetch offest request", partition);
response = consumer.fetchOffsets(fetchRequest);
if (response != null)
break;
} catch (Exception e) {
logger.error("some error occur when fetch messages", e);
try {
Thread.sleep(EXCEPTION_SLEEP_TIME);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
OffsetMetadataAndError offset = response.offsets().get(topicAndPartition);
if (offset.error() == 0)
return offset.offset();
else
return 0;
}
开发者ID:onealcenter,项目名称:ExactlyOncePartitionConsumer,代码行数:30,代码来源:ExactlyOncePartitionConsumer.java
示例6: commitOffsets
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public static OffsetCommitResponse commitOffsets(String groupId, Map<TopicAndPartition, OffsetMetadataAndError> requestInfo, short versionId,
int correlationId, String clientId) {
SimpleConsumer simpleConsumer = SimpleKafkaHelper.getDefaultSimpleConsumer();
OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupId, requestInfo, versionId, correlationId, clientId);
OffsetCommitResponse commitResponse = simpleConsumer.commitOffsets(commitRequest);
return commitResponse;
}
开发者ID:linzhaoming,项目名称:easyframe-msg,代码行数:8,代码来源:AdminUtil.java
示例7: fetchOffset
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public long fetchOffset(final TopicAndPartition topicAndPartition) throws ConsumerOffsetsException {
BlockingChannel channel = getConnectionToOffsetManager(groupId);
final List<TopicAndPartition> partitions = new ArrayList<>();
partitions.add(topicAndPartition);
final OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
groupId,
partitions,
VERSION_ID,
correlationId.incrementAndGet(),
CLIENT_NAME + "_" + topicAndPartition.topic() + "_" + topicAndPartition.partition());
try {
channel.send(fetchRequest.underlying());
final OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
final OffsetMetadataAndError result = fetchResponse.offsets().get(topicAndPartition);
final short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
throw new ConsumerOffsetsException("Not coordinator for consumer - Retry the offset fetch", offsetFetchErrorCode);
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
throw new ConsumerOffsetsException("Offset load in progress - Retry the offset fetch", offsetFetchErrorCode);
} else {
// Success
return result.offset();
//String retrievedMetadata = result.metadata();
}
} catch (ConsumerOffsetsException e) {
throw e;
} catch (Throwable t) {
throw new ConsumerOffsetsException(t);
// Client should retry the commit
} finally {
if (channel != null && channel.isConnected()) {
channel.disconnect();
}
}
}
开发者ID:wired-mind,项目名称:usher,代码行数:37,代码来源:KafkaOffsets.java
示例8: readFrom
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
@Override
public RequestOrResponse readFrom(final ByteBuffer buffer) {
// Read values from the envelope
short versionId = buffer.getShort();
int correlationId = buffer.getInt();
String clientId = readShortString(buffer);
// Read the OffsetRequest
String consumerGroupId = readShortString(buffer);
int topicCount = buffer.getInt();
Map<TopicAndPartition, OffsetMetadataAndError> pairs = Utils.flatMaps(1, topicCount, new Function0<Map<TopicAndPartition, OffsetMetadataAndError>>() {
@Override
public Map<TopicAndPartition, OffsetMetadataAndError> apply() {
final String topic = readShortString(buffer);
int partitionCount = buffer.getInt();
return Utils.flatMap(1, partitionCount, new Function0<Tuple2<TopicAndPartition, OffsetMetadataAndError>>() {
@Override
public Tuple2<TopicAndPartition, OffsetMetadataAndError> apply() {
int partitionId = buffer.getInt();
long offset = buffer.getLong();
String metadata = readShortString(buffer);
return Tuple2.make(
new TopicAndPartition(topic, partitionId),
new OffsetMetadataAndError(offset, metadata)
);
}
});
}
});
return new OffsetCommitRequest(consumerGroupId, pairs, versionId, correlationId, clientId);
}
开发者ID:bingoohuang,项目名称:buka,代码行数:35,代码来源:OffsetCommitRequestReader.java
示例9: readFrom
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public static OffsetFetchResponse readFrom(final ByteBuffer buffer) {
int correlationId = buffer.getInt();
int topicCount = buffer.getInt();
Map<TopicAndPartition, OffsetMetadataAndError> pairs = Utils.flatMaps(1, topicCount, new Function0<Map<TopicAndPartition, OffsetMetadataAndError>>() {
@Override
public Map<TopicAndPartition, OffsetMetadataAndError> apply() {
final String topic = readShortString(buffer);
int partitionCount = buffer.getInt();
return Utils.flatMap(1, partitionCount, new Function0<Tuple2<TopicAndPartition, OffsetMetadataAndError>>() {
@Override
public Tuple2<TopicAndPartition, OffsetMetadataAndError> apply() {
int partitionId = buffer.getInt();
long offset = buffer.getLong();
String metadata = readShortString(buffer);
short error = buffer.getShort();
return Tuple2.make(new TopicAndPartition(topic, partitionId),
new OffsetMetadataAndError(offset, metadata, error));
}
});
}
});
return new OffsetFetchResponse(pairs, correlationId);
}
开发者ID:bingoohuang,项目名称:buka,代码行数:29,代码来源:OffsetFetchResponse.java
示例10: OffsetFetchResponse
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
public OffsetFetchResponse(Map<TopicAndPartition, OffsetMetadataAndError> requestInfo, int correlationId) {
super(correlationId);
this.requestInfo = requestInfo;
requestInfoGroupedByTopic = Utils.groupBy(requestInfo, new Function2<TopicAndPartition, OffsetMetadataAndError, String>() {
@Override
public String apply(TopicAndPartition arg1, OffsetMetadataAndError arg2) {
return arg1.topic;
}
});
}
开发者ID:bingoohuang,项目名称:buka,代码行数:12,代码来源:OffsetFetchResponse.java
示例11: writeTo
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
@Override
public void writeTo(final ByteBuffer buffer) {
// Write envelope
buffer.putShort(versionId);
buffer.putInt(correlationId);
writeShortString(buffer, clientId);
// Write OffsetCommitRequest
writeShortString(buffer, groupId); // consumer group
buffer.putInt(requestInfoGroupedByTopic.size()); // number of topics
Utils.foreach(requestInfoGroupedByTopic, new Callable2<String, Map<TopicAndPartition,OffsetMetadataAndError>>() {
@Override
public void apply(String topic, Map<TopicAndPartition, OffsetMetadataAndError> arg2) {
writeShortString(buffer, topic); // topic
buffer.putInt(arg2.size()); // number of partitions for this topic
Utils.foreach(arg2, new Callable2<TopicAndPartition, OffsetMetadataAndError>() {
@Override
public void apply(TopicAndPartition a1, OffsetMetadataAndError a2) {
buffer.putInt(a1.partition); // partition
buffer.putLong(a2.offset); // offset
writeShortString(buffer, a2.metadata); // metadata
}
});
}
});
}
开发者ID:bingoohuang,项目名称:buka,代码行数:28,代码来源:OffsetCommitRequest.java
示例12: handleError
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
@Override
public void handleError(final Throwable e, RequestChannel requestChannel, Request request) {
Map<TopicAndPartition, Short> responseMap = Utils.map(requestInfo, new Function2<TopicAndPartition, OffsetMetadataAndError, Tuple2<TopicAndPartition, Short>>() {
@Override
public Tuple2<TopicAndPartition, Short> apply(TopicAndPartition topicAndPartition, OffsetMetadataAndError offset) {
return Tuple2.make(topicAndPartition, ErrorMapping.codeFor(e.getClass()));
}
});
OffsetCommitResponse errorResponse = new OffsetCommitResponse(responseMap, correlationId);
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)));
}
开发者ID:bingoohuang,项目名称:buka,代码行数:13,代码来源:OffsetCommitRequest.java
示例13: handleError
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
@Override
public void handleError(final Throwable e, RequestChannel requestChannel, Request request) {
Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Utils.map(requestInfo, new Function1<TopicAndPartition, Tuple2<TopicAndPartition, OffsetMetadataAndError>>() {
@Override
public Tuple2<TopicAndPartition, OffsetMetadataAndError> apply(TopicAndPartition topicAndPartition) {
return Tuple2.make(topicAndPartition,
new OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset,
OffsetMetadataAndError.NoMetadata,
ErrorMapping.codeFor(e.getClass())));
}
});
OffsetFetchResponse errorResponse = new OffsetFetchResponse(responseMap, correlationId);
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)));
}
开发者ID:bingoohuang,项目名称:buka,代码行数:16,代码来源:OffsetFetchRequest.java
示例14: getLastCommitOffset
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
long getLastCommitOffset() {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionMetadata.partitionId());
List<TopicAndPartition> topicAndPartitions = new ArrayList<>();
topicAndPartitions.add(topicAndPartition);
OffsetFetchRequest oRequest = new OffsetFetchRequest(name, topicAndPartitions, (short) 0, 0, name);
OffsetFetchResponse oResponse = consumer.fetchOffsets(oRequest);
Map<TopicAndPartition, OffsetMetadataAndError> offsets = oResponse.offsets();
OffsetMetadataAndError offset = offsets.get(topicAndPartition);
long currOffset = offset.offset() ;
if(currOffset < 0) currOffset = 0;
return currOffset;
}
开发者ID:DemandCube,项目名称:Scribengin,代码行数:13,代码来源:KafkaPartitionReader.java
示例15: getLastOffset
import kafka.common.OffsetMetadataAndError; //导入依赖的package包/类
/**
* Retrieves latest committed offset for a given topic & partition. Uses the
* new Kafka offset storage API introduced in 0.8.1.
* @param consumer consumer client to use for request
* @param topic topic id for which to lookup offset
* @param partition partition id for which to lookup offset
* @param clientName client id to include in request
* @return the offset returned from the lead broker
*/
private static long getLastOffset(final SimpleConsumer consumer, final String topic,
final int partition, final String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
List<TopicAndPartition> partitions = new ArrayList<>();
partitions.add(topicAndPartition);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
GROUP_ID,
partitions,
(short) 1,
CORRELATION_ID,
clientName);
OffsetFetchResponse response = consumer.fetchOffsets(fetchRequest);
if (response == null) {
log.error("Error fetching offset data from the Broker.");
return -1;
}
OffsetMetadataAndError result = response.offsets().get(topicAndPartition);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
log.error("Error encountered whilst fetching Kafka offset: NotCoordinatorForConsumerCode");
return -1;
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
log.error("Error encountered whilst fetching Kafka offset: OffsetsLoadInProgressCode");
return -1;
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
log.debug("Received offsets for topic " + topic + " & partition " + partition);
log.debug("Offset: " + String.valueOf(retrievedOffset) + " Metadata: " + retrievedMetadata);
// if broker has returned -1 without error, we've yet to commit.
// start to read from 0
if (retrievedOffset == -1) {
log.info("No commits found against Kafka queue for topic "
+ topic + " & partition " + partition + ". Setting read offset to 0");
return 0;
} else {
return retrievedOffset;
}
}
}
开发者ID:datasift,项目名称:datasift-connector,代码行数:55,代码来源:SimpleConsumerManager.java
注:本文中的kafka.common.OffsetMetadataAndError类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论