• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Consumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.clients.consumer.Consumer的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Consumer类属于org.apache.kafka.clients.consumer包,在下文中一共展示了Consumer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: execute

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Override
public void execute(ServiceContext ctx) throws Exception {
    active = true;
    receivedIds = new GridConcurrentHashSet<>();

    Properties config = new Properties();
    config.putAll(dataRecoveryConfig.getConsumerConfig());
    config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName());
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) {
        consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic()));
        while (active) {
            ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500);
            for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) {
                TransactionMetadata metadata = serializer.deserialize(record.key());
                receivedIds.add(metadata.getTransactionId());
            }
            consumer.commitSync();
        }
    }
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:22,代码来源:ReceivedTransactionsListenerImpl.java


示例2: readKeyValues

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
 * are already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumer       Kafka consumer
 * @param waitTime       Maximum wait time in milliseconds
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
                                                         final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
  final List<KeyValue<K, V>> consumedValues;
  consumer.subscribe(Collections.singletonList(topic));
  final int pollIntervalMs = 100;
  consumedValues = new ArrayList<>();
  int totalPollTimeMs = 0;
  while (totalPollTimeMs < waitTime &&
          continueConsuming(consumedValues.size(), maxMessages)) {
    totalPollTimeMs += pollIntervalMs;
    final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
    for (final ConsumerRecord<K, V> record : records) {
      consumedValues.add(new KeyValue<>(record.key(), record.value()));
    }
  }
  return consumedValues;
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:28,代码来源:IntegrationTestUtils.java


示例3: makeQueryRepository

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
 * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
 *
 * @param ryaInstance - The rya instance the repository is connected to. (not null)
 * @param createTopic - Set this to true if the topic doesn't exist yet.
 */
private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
    requireNonNull(ryaInstance);

    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    if(createTopic) {
        kafka.createTopic(changeLogTopic);
    }

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    return new InMemoryQueryRepository(changeLog);
}
 
开发者ID:apache,项目名称:incubator-rya,代码行数:23,代码来源:DeleteQueryCommandIT.java


示例4: afterPropertiesSet

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void afterPropertiesSet() throws Exception {
    if (topics == null && topicPatternString == null) {
        throw new IllegalArgumentException("topic info must not be null");
    }
    Assert.notEmpty(configs, "configs must not be null");
    Assert.notNull(payloadListener, "payloadListener must be null");
    String valueDeserializerKlass = (String) configs.get("value.deserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs);

    Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass);
    valueDeserializer.configure(configs, false);

    if (topics != null) {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer);
    } else {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer);
    }
    if (payloadListener != null) {
        listenableConsumer.addListener(payloadListener);
    }
    listenableConsumer.start();
}
 
开发者ID:YanXs,项目名称:nighthawk,代码行数:28,代码来源:ListenableConsumerFactoryBean.java


示例5: loadRunningConf

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void loadRunningConf(String reloadMsgJson) {
    String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
    try {
        //加载zk中的配置信息
        this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
        LOG.info("MAX_FLOW_THRESHOLD is {} on DataShardsSplittingSpout.loadRunningConf", MAX_FLOW_THRESHOLD);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.fullPullSrcTopic = commonProps.getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        this.consumer = (Consumer<String, byte[]>) confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);

        LOG.info("Running Config is " + notifyEvtName + " successfully for DataShardsSplittingSpout!");
    } catch (Exception e) {
        LOG.error(notifyEvtName + "ing running configuration encountered Exception!", e);
    } finally {
        FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-spout", true, zkConnect);
    }
}
 
开发者ID:BriData,项目名称:DBus,代码行数:21,代码来源:DataShardsSplittingSpout.java


示例6: loadRunningConf

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void loadRunningConf(String reloadMsgJson) {
    try {
        this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_MEDIANT_TOPIC);
        this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
        this.consumer =  (Consumer<String, byte[]>)confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);

        String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
        LOG.info("Running Config is " + notifyEvtName + " successfully for DataPullingSpout!");
    } catch (Exception e) {
        LOG.error("Loading running configuration encountered Exception!", e);
        throw e;
    } finally {
        FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-spout", true, zkConnect);
    }
}
 
开发者ID:BriData,项目名称:DBus,代码行数:19,代码来源:DataPullingSpout.java


示例7: createConsumer

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * createConsumer - create a new consumer
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {
    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
   
    // Seek to end automatically
    List<TopicPartition> pts = topics.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList());
    consumer.assign(pts);
    if(rollBack==0){
        consumer.seekToEnd(pts);  
    }else{
        for (TopicPartition topicPartition : pts) {
            consumer.seek(topicPartition, consumer.position(topicPartition)-rollBack);
            logger.info("Consumer seeked to -500000 :"+consumer.position(topicPartition));
        }    
    }  
    return consumer;
}
 
开发者ID:BriData,项目名称:DBus,代码行数:23,代码来源:FullPullerPerfChecker.java


示例8: createConsumer

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * createConsumer - create a new consumer
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {

    // Seek to end automatically
    TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> topics = Arrays.asList(dataTopicPartition);

    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.assign(topics);

    if(offset == -1){
        consumer.seekToEnd(topics);
        logger.info("Consumer seek to end");
    }else{
        consumer.seek(dataTopicPartition, offset);
        logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
    }
    return consumer;
}
 
开发者ID:BriData,项目名称:DBus,代码行数:25,代码来源:KafkaReader.java


示例9: seekToTransaction

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
    String groupId) {
    String topic = config.getLocalTopic();
    Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);

    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());

        for (PartitionInfo partitionInfo : partitionInfos) {
            seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
        }
        consumer.assign(seekMap.keySet());
        Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
            if (entry.getValue() != null) {
                offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
            }
        }
        consumer.commitSync(offsetsToCommit);
    }
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:25,代码来源:PublisherKafkaService.java


示例10: pollCommunicateOnce

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) {
    ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);

    if (records.isEmpty()) {
        if (!stalled && checkStalled(consumer)) {
            LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId));
            stalled = true;
            lead.notifyLocalLoaderStalled(leadId, localLoaderId);
        }
        // ToDo: Consider sending empty messages for heartbeat sake.
        return;
    }
    if (stalled) {
        stalled = false;
    }
    MutableLongList committedIds = new LongArrayList(records.count());

    for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
        committedIds.add(record.timestamp());
    }
    committedIds.sortThis();
    lead.updateInitialContext(localLoaderId, committedIds);
    consumer.commitSync();
}
 
开发者ID:epam,项目名称:Lagerta,代码行数:25,代码来源:LocalLeadContextLoader.java


示例11: StandbyTask

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * Create {@link StandbyTask} with its assigned partitions
 *
 * @param id             the ID of this task
 * @param applicationId  the ID of the stream processing application
 * @param partitions     the collection of assigned {@link TopicPartition}
 * @param topology       the instance of {@link ProcessorTopology}
 * @param consumer       the instance of {@link Consumer}
 * @param config         the {@link StreamsConfig} specified by the user
 * @param metrics        the {@link StreamsMetrics} created by the thread
 * @param stateDirectory the {@link StateDirectory} created by the thread
 */
StandbyTask(final TaskId id,
            final String applicationId,
            final Collection<TopicPartition> partitions,
            final ProcessorTopology topology,
            final Consumer<byte[], byte[]> consumer,
            final ChangelogReader changelogReader,
            final StreamsConfig config,
            final StreamsMetrics metrics,
            final StateDirectory stateDirectory) {
    super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config);

    // initialize the topology with its own context
    processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);

    log.debug("{} Initializing", logPrefix);
    initializeStateStores();
    processorContext.initialized();
    checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:StandbyTask.java


示例12: GlobalStreamThread

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
public GlobalStreamThread(final ProcessorTopology topology,
                          final StreamsConfig config,
                          final Consumer<byte[], byte[]> globalConsumer,
                          final StateDirectory stateDirectory,
                          final Metrics metrics,
                          final Time time,
                          final String threadClientId) {
    super(threadClientId);
    this.time = time;
    this.config = config;
    this.topology = topology;
    this.consumer = globalConsumer;
    this.stateDirectory = stateDirectory;
    long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
    this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
    this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:GlobalStreamThread.java


示例13: waitUntilMinKeyValueRecordsReceived

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * Wait until enough data (key-value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                              final String topic,
                                                                              final int expectedNumRecords,
                                                                              final long waitTime) throws InterruptedException {
    final List<KeyValue<K, V>> accumData = new ArrayList<>();
    try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
        final TestCondition valuesRead = new TestCondition() {
            @Override
            public boolean conditionMet() {
                final List<KeyValue<K, V>> readData =
                    readKeyValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                " while only received " + accumData.size() + ": " + accumData;
        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
    }
    return accumData;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java


示例14: waitUntilMinValuesRecordsReceived

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * Wait until enough data (value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                            final String topic,
                                                            final int expectedNumRecords,
                                                            final long waitTime) throws InterruptedException {
    final List<V> accumData = new ArrayList<>();
    try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
        final TestCondition valuesRead = new TestCondition() {
            @Override
            public boolean conditionMet() {
                final List<V> readData =
                    readValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                " while only received " + accumData.size() + ": " + accumData;
        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
    }
    return accumData;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java


示例15: readKeyValues

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
 * are already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumer       Kafka consumer
 * @param waitTime       Maximum wait time in milliseconds
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
    final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
    final List<KeyValue<K, V>> consumedValues;
    consumer.subscribe(Collections.singletonList(topic));
    final int pollIntervalMs = 100;
    consumedValues = new ArrayList<>();
    int totalPollTimeMs = 0;
    while (totalPollTimeMs < waitTime &&
        continueConsuming(consumedValues.size(), maxMessages)) {
        totalPollTimeMs += pollIntervalMs;
        final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
        for (final ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue<>(record.key(), record.value()));
        }
    }
    return consumedValues;
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:IntegrationTestUtils.java


示例16: TestStreamTask

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
TestStreamTask(final TaskId id,
               final String applicationId,
               final Collection<TopicPartition> partitions,
               final ProcessorTopology topology,
               final Consumer<byte[], byte[]> consumer,
               final Producer<byte[], byte[]> producer,
               final Consumer<byte[], byte[]> restoreConsumer,
               final StreamsConfig config,
               final StreamsMetrics metrics,
               final StateDirectory stateDirectory) {
    super(id,
        applicationId,
        partitions,
        topology,
        consumer,
        new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
        config,
        metrics,
        stateDirectory,
        null,
        new MockTime(),
        producer);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:StreamThreadTest.java


示例17: commitOffsets

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
                          boolean async) {

    if (offsetsToCommit == null || offsetsToCommit.isEmpty()) {
        return;
    }

    OffsetCommitCallback callback = (offsets, exception) -> {
        if (exception != null) {
            LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}",
                    offsets.size(),
                    offsetsAsString(offsets),
                    exception.getMessage(),
                    exception);
        } else {
            LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}",
                    offsets.size(), offsetsAsString(offsets));
        }
    };

    if (async) {
        consumer.commitAsync(offsetsToCommit, callback);
    } else {
        consumer.commitSync(offsetsToCommit);
    }
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:27,代码来源:KafkaUtils.java


示例18: testOnPartitionsRevoked

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Test
@SuppressWarnings("serial")
public void testOnPartitionsRevoked() throws Exception {
    String topic = "topic";
    int partition = 0;
    long offset = 21;
    TopicPartition tp = new TopicPartition(topic, partition);
    OffsetAndMetadata commitOffsetMd = new OffsetAndMetadata(offset, null);
    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(consumer.position(tp)).thenReturn(offset);

    underTest.onPartitionsRevoked(Collections.singleton(tp));

    verify(consumer).position(tp);
    verify(consumer).commitSync(new HashMap(){
            {
                put(tp, commitOffsetMd);
            }
    });
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:24,代码来源:SaveOffsetOnRebalanceTest.java


示例19: testOnPartitionsAssignedNegativeOffsetLookup

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
@Test
public void testOnPartitionsAssignedNegativeOffsetLookup() throws Exception {
    long offset = -1;
    String topic = "topic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(topic, partition);

    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);

    underTest.onPartitionsAssigned(Collections.singleton(tp));

    verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
    if (SaveOffsetOnRebalance.DEFAULT_SEEK_BEHAVIOR == Seek.EARLIEST) {
        verify(consumer).seekToBeginning(Collections.singleton(tp));
    } else {
        verify(consumer).seekToEnd(Collections.singleton(tp));
    }
    verify(consumer).position(tp);
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:24,代码来源:SaveOffsetOnRebalanceTest.java


示例20: performOffsetLookupTest

import org.apache.kafka.clients.consumer.Consumer; //导入依赖的package包/类
private static void performOffsetLookupTest(long offset) {
    String topic = "topic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(topic, partition);

    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);

    underTest.onPartitionsAssigned(Collections.singleton(tp));

    verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
    verify(consumer).seek(tp, offset + 1);
    verify(consumer).position(tp);
}
 
开发者ID:rmap-project,项目名称:rmap,代码行数:18,代码来源:SaveOffsetOnRebalanceTest.java



注:本文中的org.apache.kafka.clients.consumer.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SnapshotCreationException类代码示例发布时间:2022-05-21
下一篇:
Java ProtocolUtils类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap