本文整理汇总了Java中org.apache.kafka.clients.consumer.Consumer类的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Consumer类属于org.apache.kafka.clients.consumer包,在下文中一共展示了Consumer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: execute
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Override
public void execute(ServiceContext ctx) throws Exception {
active = true;
receivedIds = new GridConcurrentHashSet<>();
Properties config = new Properties();
config.putAll(dataRecoveryConfig.getConsumerConfig());
config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) {
consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic()));
while (active) {
ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500);
for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) {
TransactionMetadata metadata = serializer.deserialize(record.key());
receivedIds.add(metadata.getTransactionId());
}
consumer.commitSync();
}
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:22,代码来源:ReceivedTransactionsListenerImpl.java
示例2: readKeyValues
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
* are already configured in the consumer).
*
* @param topic Kafka topic to read messages from
* @param consumer Kafka consumer
* @param waitTime Maximum wait time in milliseconds
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
final List<KeyValue<K, V>> consumedValues;
consumer.subscribe(Collections.singletonList(topic));
final int pollIntervalMs = 100;
consumedValues = new ArrayList<>();
int totalPollTimeMs = 0;
while (totalPollTimeMs < waitTime &&
continueConsuming(consumedValues.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
for (final ConsumerRecord<K, V> record : records) {
consumedValues.add(new KeyValue<>(record.key(), record.value()));
}
}
return consumedValues;
}
开发者ID:confluentinc,项目名称:ksql,代码行数:28,代码来源:IntegrationTestUtils.java
示例3: makeQueryRepository
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
* to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
*
* @param ryaInstance - The rya instance the repository is connected to. (not null)
* @param createTopic - Set this to true if the topic doesn't exist yet.
*/
private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
requireNonNull(ryaInstance);
// Make sure the topic that the change log uses exists.
final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
if(createTopic) {
kafka.createTopic(changeLogTopic);
}
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
return new InMemoryQueryRepository(changeLog);
}
开发者ID:apache,项目名称:incubator-rya,代码行数:23,代码来源:DeleteQueryCommandIT.java
示例4: afterPropertiesSet
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void afterPropertiesSet() throws Exception {
if (topics == null && topicPatternString == null) {
throw new IllegalArgumentException("topic info must not be null");
}
Assert.notEmpty(configs, "configs must not be null");
Assert.notNull(payloadListener, "payloadListener must be null");
String valueDeserializerKlass = (String) configs.get("value.deserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs);
Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass);
valueDeserializer.configure(configs, false);
if (topics != null) {
listenableConsumer =
new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer);
} else {
listenableConsumer =
new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer);
}
if (payloadListener != null) {
listenableConsumer.addListener(payloadListener);
}
listenableConsumer.start();
}
开发者ID:YanXs,项目名称:nighthawk,代码行数:28,代码来源:ListenableConsumerFactoryBean.java
示例5: loadRunningConf
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void loadRunningConf(String reloadMsgJson) {
String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
try {
//加载zk中的配置信息
this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
LOG.info("MAX_FLOW_THRESHOLD is {} on DataShardsSplittingSpout.loadRunningConf", MAX_FLOW_THRESHOLD);
this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
this.fullPullSrcTopic = commonProps.getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
this.consumer = (Consumer<String, byte[]>) confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);
this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
LOG.info("Running Config is " + notifyEvtName + " successfully for DataShardsSplittingSpout!");
} catch (Exception e) {
LOG.error(notifyEvtName + "ing running configuration encountered Exception!", e);
} finally {
FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-spout", true, zkConnect);
}
}
开发者ID:BriData,项目名称:DBus,代码行数:21,代码来源:DataShardsSplittingSpout.java
示例6: loadRunningConf
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void loadRunningConf(String reloadMsgJson) {
try {
this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_MEDIANT_TOPIC);
this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
this.consumer = (Consumer<String, byte[]>)confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);
String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
LOG.info("Running Config is " + notifyEvtName + " successfully for DataPullingSpout!");
} catch (Exception e) {
LOG.error("Loading running configuration encountered Exception!", e);
throw e;
} finally {
FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-spout", true, zkConnect);
}
}
开发者ID:BriData,项目名称:DBus,代码行数:19,代码来源:DataPullingSpout.java
示例7: createConsumer
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* createConsumer - create a new consumer
* @return
* @throws Exception
*/
private Consumer<String, String> createConsumer() throws Exception {
Properties props = ConfUtils.getProps(CONSUMER_PROPS);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Seek to end automatically
List<TopicPartition> pts = topics.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList());
consumer.assign(pts);
if(rollBack==0){
consumer.seekToEnd(pts);
}else{
for (TopicPartition topicPartition : pts) {
consumer.seek(topicPartition, consumer.position(topicPartition)-rollBack);
logger.info("Consumer seeked to -500000 :"+consumer.position(topicPartition));
}
}
return consumer;
}
开发者ID:BriData,项目名称:DBus,代码行数:23,代码来源:FullPullerPerfChecker.java
示例8: createConsumer
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* createConsumer - create a new consumer
* @return
* @throws Exception
*/
private Consumer<String, String> createConsumer() throws Exception {
// Seek to end automatically
TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> topics = Arrays.asList(dataTopicPartition);
Properties props = ConfUtils.getProps(CONSUMER_PROPS);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(topics);
if(offset == -1){
consumer.seekToEnd(topics);
logger.info("Consumer seek to end");
}else{
consumer.seek(dataTopicPartition, offset);
logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
}
return consumer;
}
开发者ID:BriData,项目名称:DBus,代码行数:25,代码来源:KafkaReader.java
示例9: seekToTransaction
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
String groupId) {
String topic = config.getLocalTopic();
Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);
try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
}
consumer.assign(seekMap.keySet());
Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
if (entry.getValue() != null) {
offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
}
}
consumer.commitSync(offsetsToCommit);
}
}
开发者ID:epam,项目名称:Lagerta,代码行数:25,代码来源:PublisherKafkaService.java
示例10: pollCommunicateOnce
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) {
ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
if (records.isEmpty()) {
if (!stalled && checkStalled(consumer)) {
LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId));
stalled = true;
lead.notifyLocalLoaderStalled(leadId, localLoaderId);
}
// ToDo: Consider sending empty messages for heartbeat sake.
return;
}
if (stalled) {
stalled = false;
}
MutableLongList committedIds = new LongArrayList(records.count());
for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
committedIds.add(record.timestamp());
}
committedIds.sortThis();
lead.updateInitialContext(localLoaderId, committedIds);
consumer.commitSync();
}
开发者ID:epam,项目名称:Lagerta,代码行数:25,代码来源:LocalLeadContextLoader.java
示例11: StandbyTask
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* Create {@link StandbyTask} with its assigned partitions
*
* @param id the ID of this task
* @param applicationId the ID of the stream processing application
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param config the {@link StreamsConfig} specified by the user
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
StandbyTask(final TaskId id,
final String applicationId,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final ChangelogReader changelogReader,
final StreamsConfig config,
final StreamsMetrics metrics,
final StateDirectory stateDirectory) {
super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config);
// initialize the topology with its own context
processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
log.debug("{} Initializing", logPrefix);
initializeStateStores();
processorContext.initialized();
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:StandbyTask.java
示例12: GlobalStreamThread
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final Metrics metrics,
final Time time,
final String threadClientId) {
super(threadClientId);
this.time = time;
this.config = config;
this.topology = topology;
this.consumer = globalConsumer;
this.stateDirectory = stateDirectory;
long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:GlobalStreamThread.java
示例13: waitUntilMinKeyValueRecordsReceived
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* Wait until enough data (key-value records) has been consumed.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
};
final String conditionDetails =
"Expecting " + expectedNumRecords + " records from topic " + topic +
" while only received " + accumData.size() + ": " + accumData;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
}
return accumData;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java
示例14: waitUntilMinValuesRecordsReceived
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* Wait until enough data (value records) has been consumed.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<V> accumData = new ArrayList<>();
try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<V> readData =
readValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
};
final String conditionDetails =
"Expecting " + expectedNumRecords + " records from topic " + topic +
" while only received " + accumData.size() + ": " + accumData;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
}
return accumData;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java
示例15: readKeyValues
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
* Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
* are already configured in the consumer).
*
* @param topic Kafka topic to read messages from
* @param consumer Kafka consumer
* @param waitTime Maximum wait time in milliseconds
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
final List<KeyValue<K, V>> consumedValues;
consumer.subscribe(Collections.singletonList(topic));
final int pollIntervalMs = 100;
consumedValues = new ArrayList<>();
int totalPollTimeMs = 0;
while (totalPollTimeMs < waitTime &&
continueConsuming(consumedValues.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
for (final ConsumerRecord<K, V> record : records) {
consumedValues.add(new KeyValue<>(record.key(), record.value()));
}
}
return consumedValues;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:IntegrationTestUtils.java
示例16: TestStreamTask
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
TestStreamTask(final TaskId id,
final String applicationId,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final Producer<byte[], byte[]> producer,
final Consumer<byte[], byte[]> restoreConsumer,
final StreamsConfig config,
final StreamsMetrics metrics,
final StateDirectory stateDirectory) {
super(id,
applicationId,
partitions,
topology,
consumer,
new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
config,
metrics,
stateDirectory,
null,
new MockTime(),
producer);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:StreamThreadTest.java
示例17: commitOffsets
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
boolean async) {
if (offsetsToCommit == null || offsetsToCommit.isEmpty()) {
return;
}
OffsetCommitCallback callback = (offsets, exception) -> {
if (exception != null) {
LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}",
offsets.size(),
offsetsAsString(offsets),
exception.getMessage(),
exception);
} else {
LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}",
offsets.size(), offsetsAsString(offsets));
}
};
if (async) {
consumer.commitAsync(offsetsToCommit, callback);
} else {
consumer.commitSync(offsetsToCommit);
}
}
开发者ID:rmap-project,项目名称:rmap,代码行数:27,代码来源:KafkaUtils.java
示例18: testOnPartitionsRevoked
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Test
@SuppressWarnings("serial")
public void testOnPartitionsRevoked() throws Exception {
String topic = "topic";
int partition = 0;
long offset = 21;
TopicPartition tp = new TopicPartition(topic, partition);
OffsetAndMetadata commitOffsetMd = new OffsetAndMetadata(offset, null);
Consumer consumer = mock(Consumer.class);
OffsetLookup lookup = mock(OffsetLookup.class);
SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);
when(consumer.position(tp)).thenReturn(offset);
underTest.onPartitionsRevoked(Collections.singleton(tp));
verify(consumer).position(tp);
verify(consumer).commitSync(new HashMap(){
{
put(tp, commitOffsetMd);
}
});
}
开发者ID:rmap-project,项目名称:rmap,代码行数:24,代码来源:SaveOffsetOnRebalanceTest.java
示例19: testOnPartitionsAssignedNegativeOffsetLookup
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Test
public void testOnPartitionsAssignedNegativeOffsetLookup() throws Exception {
long offset = -1;
String topic = "topic";
int partition = 0;
TopicPartition tp = new TopicPartition(topic, partition);
Consumer consumer = mock(Consumer.class);
OffsetLookup lookup = mock(OffsetLookup.class);
SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);
when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);
underTest.onPartitionsAssigned(Collections.singleton(tp));
verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
if (SaveOffsetOnRebalance.DEFAULT_SEEK_BEHAVIOR == Seek.EARLIEST) {
verify(consumer).seekToBeginning(Collections.singleton(tp));
} else {
verify(consumer).seekToEnd(Collections.singleton(tp));
}
verify(consumer).position(tp);
}
开发者ID:rmap-project,项目名称:rmap,代码行数:24,代码来源:SaveOffsetOnRebalanceTest.java
示例20: performOffsetLookupTest
import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private static void performOffsetLookupTest(long offset) {
String topic = "topic";
int partition = 0;
TopicPartition tp = new TopicPartition(topic, partition);
Consumer consumer = mock(Consumer.class);
OffsetLookup lookup = mock(OffsetLookup.class);
SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);
when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);
underTest.onPartitionsAssigned(Collections.singleton(tp));
verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
verify(consumer).seek(tp, offset + 1);
verify(consumer).position(tp);
}
开发者ID:rmap-project,项目名称:rmap,代码行数:18,代码来源:SaveOffsetOnRebalanceTest.java
注:本文中的org.apache.kafka.clients.consumer.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论