• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java FetchRequest类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中kafka.api.FetchRequest的典型用法代码示例。如果您正苦于以下问题:Java FetchRequest类的具体用法?Java FetchRequest怎么用?Java FetchRequest使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FetchRequest类属于kafka.api包,在下文中一共展示了FetchRequest类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import kafka.api.FetchRequest; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	final String topic = "test2";
	String clientId = "LowLevelConsumerClient1";
	SimpleConsumer simpleConsumer = new SimpleConsumer(
			"192.168.1.186", 9092, 6000000, 64 * 1000000, clientId);
	FetchRequest req = new FetchRequestBuilder().clientId(clientId)
							.addFetch(topic, 0, 0L, 1000000)
							.addFetch(topic, 1, 0L, 1000000)
							.addFetch(topic, 2, 0L, 1000000)
							.build();
	FetchResponse rep = simpleConsumer.fetch(req);						
	ByteBufferMessageSet messageSet = rep.messageSet(topic, 0);
	for(MessageAndOffset messageAndOffset : messageSet) {
		ByteBuffer payload = messageAndOffset.message().payload();
		long offset = messageAndOffset.offset();
		byte[] bytes = new byte[payload.limit()];
		payload.get(bytes);
		System.out.println("Offset : " + offset + ", Payload : " + new String(bytes, "UTF-8"));
	}
}
 
开发者ID:walle-liao,项目名称:jaf-examples,代码行数:21,代码来源:LowLevelConsumerDemo.java


示例2: main

import kafka.api.FetchRequest; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	final String topic = "topic1";
	String clientID = "DemoLowLevelConsumer1";
	SimpleConsumer simpleConsumer = new SimpleConsumer("kafka0", 9092, 100000, 64 * 1000000, clientID);
	FetchRequest req = new FetchRequestBuilder().clientId(clientID)
			.addFetch(topic, 0, 0L, 50).addFetch(topic, 1, 0L, 5000).addFetch(topic, 2, 0L, 1000000).build();
	FetchResponse fetchResponse = simpleConsumer.fetch(req);
	ByteBufferMessageSet messageSet = (ByteBufferMessageSet) fetchResponse.messageSet(topic, 0);
	for (MessageAndOffset messageAndOffset : messageSet) {
		ByteBuffer payload = messageAndOffset.message().payload();
		long offset = messageAndOffset.offset();
		byte[] bytes = new byte[payload.limit()];
		payload.get(bytes);
		System.out.println("Offset:" + offset + ", Payload:" + new String(bytes, "UTF-8"));
	}
}
 
开发者ID:habren,项目名称:KafkaExample,代码行数:17,代码来源:DemoLowLevelConsumer.java


示例3: fetchLatestRecordPayloadBytes

import kafka.api.FetchRequest; //导入依赖的package包/类
private byte[] fetchLatestRecordPayloadBytes(SimpleConsumer kafkaConsumer) {
  FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(destinationTopic, 0, 0, 1000000).build();
  FetchResponse response = kafkaConsumer.fetch(fetchRequest);

  Iterator<MessageAndOffset> messageSetItr = response.messageSet(destinationTopic, 0).iterator();

  // Fast forward to the message at the latest offset in the topic
  MessageAndOffset latestMessage = new MessageAndOffset(new Message(new byte[] { }), 0L);
  while (messageSetItr.hasNext()) {
    latestMessage = messageSetItr.next();
  }

  ByteBuffer payload = latestMessage.message().payload();
  byte[] bytes = new byte[payload.limit()];
  payload.get(bytes);
  return bytes;
}
 
开发者ID:verisign,项目名称:storm-graphite,代码行数:18,代码来源:BaseKafkaReporterTest.java


示例4: buildFetchRequest

import kafka.api.FetchRequest; //导入依赖的package包/类
private FetchRequest buildFetchRequest(long offset) {
  //1. maxWaitTime is the maximum amount of time in milliseconds to block waiting if insufficient data is
  //   available at the time the request is issued.

  //2. minFetchSize is the minimum number of bytes of messages that must be available to give a response. If the
  //   client sets this to 0 the server will always respond immediately, however if there is no new data since their
  //   last request they will just get back empty message sets. If this is set to 1, the server will respond as soon
  //   as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher
  //   values in combination with the timeout the consumer can tune for throughput and trade a little additional
  //   latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k
  //   would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

  //3. maxFetchSize is the maximum bytes to include in the message set for this partition.
  //   This helps bound the size of the response.
  LOG.info("Building fetch request with clientId {}, minBytes {}, maxWait {}, topic {}, partition {}, offset {}, " +
    "max fetch size {}.", clientName, minFetchSize, maxWaitTime, topic, partition, offset, maxFetchSize);
  return new FetchRequestBuilder()
    .clientId(clientName)
    .minBytes(minFetchSize)
    .maxWait(maxWaitTime)
    .addFetch(topic, partition, offset, maxFetchSize)
    .build();
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:24,代码来源:KafkaLowLevelConsumer08.java


示例5: getMessageSetSince

import kafka.api.FetchRequest; //导入依赖的package包/类
private ByteBufferMessageSet getMessageSetSince(long offset, int timeoutInMs)  {
    if (timeoutInMs < 0) {
        throw new IllegalArgumentException(String.format("Timeout must not lower than 0, timeout is: %d", timeoutInMs));
    }
    FetchRequest request = new FetchRequestBuilder()
            .clientId(generateClientId())
            .addFetch(assignedTopicPartition.topic(), assignedTopicPartition.partition(), offset, consumerConfig.bufferSize())
            .maxWait(timeoutInMs)
            .minBytes(consumerConfig.bufferSize())
            .build();
    FetchResponse response = partitionConsumer.fetch(request);
    if (response.hasError()) {
        short errorCode = response.errorCode(assignedTopicPartition.topic(), assignedTopicPartition.partition());
        // @todo retry during broker failover
        throw new PartitionConsumerException(ErrorMapping.exceptionFor(errorCode));
    }
    return response.messageSet(assignedTopicPartition.topic(), assignedTopicPartition.partition());
}
 
开发者ID:researchgate,项目名称:kafka-metamorph,代码行数:19,代码来源:Kafka08PartitionConsumer.java


示例6: run

import kafka.api.FetchRequest; //导入依赖的package包/类
@Override
  public void run()
  {
    long offset = 0;
    while (isAlive) {
      // create a fetch request for topic “topic1”, partition 0, current offset, and fetch size of 1MB
      FetchRequest fetchRequest = new FetchRequestBuilder().clientId("default_client").addFetch("topic1", 1, offset, 1000000).build();

//      FetchRequest fetchRequest = new FetchRequest("topic1", 0, offset, 1000000);

      // get the message set from the consumer and print them out
      ByteBufferMessageSet messages = consumer.fetch(fetchRequest).messageSet("topic1", 1);
      Iterator<MessageAndOffset> itr = messages.iterator();

      while (itr.hasNext() && isAlive) {
        MessageAndOffset msg = itr.next();
        // advance the offset after consuming each message
        offset = msg.offset();
        logger.debug("consumed: {} offset: {}", byteBufferToString(msg.message().payload()).toString(), offset);
        receiveCount++;
      }
    }
  }
 
开发者ID:apache,项目名称:apex-malhar,代码行数:24,代码来源:KafkaSimpleConsumer.java


示例7: fetch

import kafka.api.FetchRequest; //导入依赖的package包/类
/** 返回消费的消息Map, 指定offset
 * <li>(Key为Topic name, Value为返回消息的消息List
 * 
 * @param topics The topic names
 * @param partitions Topic position
 * @param offsets	Starting byte offset
 * @return
 * @throws Exception
 */
static Map<String, List<String>> fetch(SimpleConsumer simpleConsumer, String[] topics, int[] partitions, long[] offsets) throws Exception{
	FetchRequest fetchRequest = getFetchRequest(simpleConsumer,topics, partitions, offsets);
	FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
	
	Map<String, List<String>> retMap = new HashMap<String, List<String>>();
	for (int i = 0; i < topics.length; i++) {
		String topic = topics[i];
		List list = new ArrayList<String>();
		retMap.put(topic, list);
		
		ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, partitions[i]);
		
		for (MessageAndOffset messageAndOffset : messageSet) {
			ByteBuffer payload = messageAndOffset.message().payload();
			byte[] bytes = new byte[payload.limit()];
			payload.get(bytes);
			String msg = new String(bytes, "UTF-8");
			list.add(msg);
		}
	}
	
	return retMap;
}
 
开发者ID:linzhaoming,项目名称:easyframe-msg,代码行数:33,代码来源:SimpleKafkaHelper.java


示例8: openFetchRequest

import kafka.api.FetchRequest; //导入依赖的package包/类
private void openFetchRequest()
{
    if (messageAndOffsetIterator == null) {
        log.debug("Fetching %d bytes from offset %d (%d - %d). %d messages read so far", KAFKA_READ_BUFFER_SIZE, cursorOffset, split.getStart(), split.getEnd(), totalMessages);
        FetchRequest req = new FetchRequestBuilder()
                .clientId("presto-worker-" + Thread.currentThread().getName())
                .addFetch(split.getTopicName(), split.getPartitionId(), cursorOffset, KAFKA_READ_BUFFER_SIZE)
                .build();

        // TODO - this should look at the actual node this is running on and prefer
        // that copy if running locally. - look into NodeInfo
        SimpleConsumer consumer = consumerManager.getConsumer(split.getNodes().get(0));

        FetchResponse fetchResponse = consumer.fetch(req);
        if (fetchResponse.hasError()) {
            short errorCode = fetchResponse.errorCode(split.getTopicName(), split.getPartitionId());
            log.warn("Fetch response has error: %d", errorCode);
            throw new PrestoException(KAFKA_SPLIT_ERROR, "could not fetch data from Kafka, error code is '" + errorCode + "'");
        }

        messageAndOffsetIterator = fetchResponse.messageSet(split.getTopicName(), split.getPartitionId()).iterator();
    }
}
 
开发者ID:y-lan,项目名称:presto,代码行数:24,代码来源:KafkaRecordSet.java


示例9: readMessages

import kafka.api.FetchRequest; //导入依赖的package包/类
public List<byte[]> readMessages(String topic) {
  SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
  FetchRequest req = new FetchRequestBuilder()
          .clientId("consumer")
          .addFetch(topic, 0, 0, 100000)
          .build();
  FetchResponse fetchResponse = consumer.fetch(req);
  Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
  List<byte[]> messages = new ArrayList<>();
  while(results.hasNext()) {
    ByteBuffer payload = results.next().message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    messages.add(bytes);
  }
  consumer.close();
  return messages;
}
 
开发者ID:apache,项目名称:metron,代码行数:19,代码来源:KafkaComponent.java


示例10: fetchMore

import kafka.api.FetchRequest; //导入依赖的package包/类
private void fetchMore()
{
    FetchRequest fetchRequest = this.builder
            .clientId(split.getClientId())
            .addFetch(split.getTopicName(), split.getPartitionId(),
                    nextFetchOffset, DEFAULT_BUFFER_SIZE).build();
    response = consumer.fetch(fetchRequest);
    this.currentResponseIter = null;
    if (response != null)
    {
        List<MessageAndOffset> currentResponseList = new ArrayList<MessageAndOffset>();
        for (MessageAndOffset messageAndOffset : response.messageSet(
                split.getTopicName(), split.getPartitionId()))
        {
            currentResponseList.add(messageAndOffset);
        }
        this.currentResponseIter = currentResponseList.size() > 0 ? currentResponseList.iterator() : null;
    }
}
 
开发者ID:dropbox,项目名称:presto-kafka-connector,代码行数:20,代码来源:KafkaRecordCursor.java


示例11: continueItr

import kafka.api.FetchRequest; //导入依赖的package包/类
/**
 * THIS METHOD HAS SIDE EFFECTS - it will update {@code currentMessageItr} (if necessary) and then return true iff
 * the iterator still has elements to be read. If you call {@link scala.collection.Iterator#next()} when this method
 * returns false, you risk a {@link NullPointerException} OR a no-more-elements exception.
 * 
 * @return true if you can call {@link scala.collection.Iterator#next()} on {@code currentMessageItr}.
 */
@VisibleForTesting
boolean continueItr() {
    final long remaining = end - currentOffset;
    if (!canCallNext() && remaining > 0) {
        final int theFetchSize = (fetchSize > remaining) ? (int) remaining : fetchSize;
        LOG.debug(String.format("%s fetching %d bytes starting at offset %d", split.toString(), theFetchSize,
                currentOffset));
        final FetchRequest request = new FetchRequest(split.getPartition().getTopic(), split.getPartition()
                .getPartId(), currentOffset, theFetchSize);
        final ByteBufferMessageSet msg = consumer.fetch(request);
        final int errorCode = msg.getErrorCode();
        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
            return false;
        }
        if (errorCode != ErrorMapping.NoError()) {
            ErrorMapping.maybeThrowException(errorCode);
        } // --> else we try to grab the next iterator
        currentMessageItr = msg.iterator();
        currentOffset += msg.validBytes();
    }
    return canCallNext();
}
 
开发者ID:Conductor,项目名称:kangaroo,代码行数:30,代码来源:KafkaRecordReader.java


示例12: doWork

import kafka.api.FetchRequest; //导入依赖的package包/类
@Override
public void doWork() {
    Utils.inLock(partitionMapLock, new Function0<Object>() {
        @Override
        public Object apply() {
            if (partitionMap.isEmpty())
                Utils.await(partitionMapCond, 200L, TimeUnit.MILLISECONDS);

            Utils.foreach(partitionMap, new Callable2<TopicAndPartition, Long>() {
                @Override
                public void apply(TopicAndPartition topicAndPartition, Long offset) {
                    fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize);
                }
            });
            return null;
        }
    });

    FetchRequest fetchRequest = fetchRequestBuilder.build();
    if (!fetchRequest.requestInfo.isEmpty())
        processFetchRequest(fetchRequest);
}
 
开发者ID:bingoohuang,项目名称:buka,代码行数:24,代码来源:AbstractFetcherThread.java


示例13: fetchMore

import kafka.api.FetchRequest; //导入依赖的package包/类
public boolean fetchMore () throws IOException {
    if (!hasMore()) return false;
    
    FetchRequest fetchRequest = 
        new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
    List<FetchRequest> array = new ArrayList<FetchRequest>();
    array.add(fetchRequest);

    long tempTime = System.currentTimeMillis();
    _response = _consumer.multifetch(array);
    if(_response != null)
        _respIterator = _response.iterator();
    _requestTime += (System.currentTimeMillis() - tempTime);
    
    return true;
}
 
开发者ID:yanfang724,项目名称:hadoop-consumer,代码行数:17,代码来源:KafkaETLContext.java


示例14: emitPartitionBatchNew

import kafka.api.FetchRequest; //导入依赖的package包/类
public static BatchMeta emitPartitionBatchNew(KafkaConfig config, int partition, SimpleConsumer consumer, TransactionAttempt attempt, BatchOutputCollector collector, BatchMeta lastMeta) {
    long offset = 0;
    if(lastMeta!=null) {
        offset = lastMeta.nextOffset;
    }
    ByteBufferMessageSet msgs;
    try {
       msgs = consumer.fetch(new FetchRequest(config.topic, partition % config.partitionsPerHost, offset, config.fetchSizeBytes));
    } catch(Exception e) {
        if(e instanceof ConnectException) {
            throw new FailedFetchException(e);
        } else {
            throw new RuntimeException(e);
        }
    }
    long endoffset = offset;
    for(MessageAndOffset msg: msgs) {
        emit(config, attempt, collector, msg.message());
        endoffset = msg.offset();
    }
    BatchMeta newMeta = new BatchMeta();
    newMeta.offset = offset;
    newMeta.nextOffset = endoffset;
    return newMeta;
}
 
开发者ID:YinYanfei,项目名称:CadalWorkspace,代码行数:26,代码来源:KafkaUtils.java


示例15: main

import kafka.api.FetchRequest; //导入依赖的package包/类
public static void main(String[] args) {
		Properties props = new Properties();
//	props.put("zk.connect","10.15.62.76:2181");
//		props.put("groupid","testgroup");
		
		SimpleConsumer consumer = new SimpleConsumer("10.15.62.70",9092,10000,1024000);
		long offset =  0; 
		int count = 0;
		String str1 = "";
	//	while(true){
			FetchRequest fetchRequest  = new FetchRequest("topic1114",3,offset,10000000);//���һ��������һ���������ݵ������byte
			ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
			for(MessageAndOffset msg  :messages){
				count++;
				ByteBuffer buffer = msg.message().payload();
				byte[] bytes = new byte[buffer.remaining()];
				buffer.get(bytes);
				String str = new String(bytes);
				System.out.println(str);
				offset = msg.offset();
				System.out.println("offset: " + offset);
			}
			System.out.println("------------ count= " + count);
	//	}
	}
 
开发者ID:YinYanfei,项目名称:CadalWorkspace,代码行数:26,代码来源:mySimpleConsumer.java


示例16: nextMessageSet

import kafka.api.FetchRequest; //导入依赖的package包/类
void nextMessageSet() throws Exception {
  FetchRequest req = 
      new FetchRequestBuilder().
      clientId(name).
      addFetch(topic, partitionMetadata.partitionId(), currentOffset, fetchSize).
      minBytes(1).
      maxWait(1000).
      build();
  
  FetchResponse fetchResponse = consumer.fetch(req);
  if(fetchResponse.hasError()) {
    throw new Exception("TODO: handle the error, reset the consumer....");
  }
  
  currentMessageSet = fetchResponse.messageSet(topic, partitionMetadata.partitionId());
  currentMessageSetIterator = currentMessageSet.iterator();
}
 
开发者ID:DemandCube,项目名称:Scribengin,代码行数:18,代码来源:KafkaPartitionReader.java


示例17: main

import kafka.api.FetchRequest; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    generateData();

    SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.KAFKA_SERVER_URL,
        KafkaProperties.KAFKA_SERVER_PORT,
        KafkaProperties.CONNECTION_TIMEOUT,
        KafkaProperties.KAFKA_PRODUCER_BUFFER_SIZE,
        KafkaProperties.CLIENT_ID);

    System.out.println("Testing single fetch");
    FetchRequest req = new FetchRequestBuilder()
        .clientId(KafkaProperties.CLIENT_ID)
        .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
        .build();
    FetchResponse fetchResponse = simpleConsumer.fetch(req);
    printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));

    System.out.println("Testing single multi-fetch");
    Map<String, List<Integer>> topicMap = new HashMap<>();
    topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
    topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
    req = new FetchRequestBuilder()
        .clientId(KafkaProperties.CLIENT_ID)
        .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
        .addFetch(KafkaProperties.TOPIC3, 0, 0L, 100)
        .build();
    fetchResponse = simpleConsumer.fetch(req);
    int fetchReq = 0;
    for (Map.Entry<String, List<Integer>> entry : topicMap.entrySet()) {
        String topic = entry.getKey();
        for (Integer offset : entry.getValue()) {
            System.out.println("Response from fetch request no: " + ++fetchReq);
            printMessages(fetchResponse.messageSet(topic, offset));
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:37,代码来源:SimpleConsumerDemo.java


示例18: fetchMessages

import kafka.api.FetchRequest; //导入依赖的package包/类
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException, RuntimeException {
    ByteBufferMessageSet msgs = null;
    String topic = config.topic;
    int partitionId = partition.partition;
    FetchRequestBuilder builder = new FetchRequestBuilder();
    FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
            clientId(config.clientId).maxWait(config.fetchMaxWait).build();
    FetchResponse fetchResponse;
    try {
        fetchResponse = consumer.fetch(fetchRequest);
    } catch (Exception e) {
        if (e instanceof ConnectException ||
                e instanceof SocketTimeoutException ||
                e instanceof IOException ||
                e instanceof UnresolvedAddressException
                ) {
            LOG.warn("Network error when fetching messages:", e);
            throw new FailedFetchException(e);
        } else {
            throw new RuntimeException(e);
        }
    }
    if (fetchResponse.hasError()) {
        KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
        if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
            String msg = "Got fetch request with offset out of range: [" + offset + "]";
            LOG.warn(msg);
            throw new TopicOffsetOutOfRangeException(msg);
        } else {
            String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
            LOG.error(message);
            throw new FailedFetchException(message);
        }
    } else {
        msgs = fetchResponse.messageSet(topic, partitionId);
    }
    return msgs;
}
 
开发者ID:redBorder,项目名称:rb-bi,代码行数:39,代码来源:KafkaUtils.java


示例19: main

import kafka.api.FetchRequest; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    generateData();

    SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.KAFKA_SERVER_URL,
        KafkaProperties.KAFKA_SERVER_PORT,
        KafkaProperties.CONNECTION_TIMEOUT,
        KafkaProperties.KAFKA_PRODUCER_BUFFER_SIZE,
        KafkaProperties.CLIENT_ID);

    System.out.println("Testing single fetch");
    FetchRequest req = new FetchRequestBuilder()
        .clientId(KafkaProperties.CLIENT_ID)
        .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
        .build();
    FetchResponse fetchResponse = simpleConsumer.fetch(req);
    printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));

    System.out.println("Testing single multi-fetch");
    Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
    topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
    topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
    req = new FetchRequestBuilder()
        .clientId(KafkaProperties.CLIENT_ID)
        .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
        .addFetch(KafkaProperties.TOPIC3, 0, 0L, 100)
        .build();
    fetchResponse = simpleConsumer.fetch(req);
    int fetchReq = 0;
    for (Map.Entry<String, List<Integer>> entry : topicMap.entrySet()) {
        String topic = entry.getKey();
        for (Integer offset : entry.getValue()) {
            System.out.println("Response from fetch request no: " + ++fetchReq);
            printMessages(fetchResponse.messageSet(topic, offset));
        }
    }
}
 
开发者ID:ggj2010,项目名称:javabase,代码行数:37,代码来源:SimpleConsumerDemo.java


示例20: fetchMessages

import kafka.api.FetchRequest; //导入依赖的package包/类
/**
 * Makes a call to kafka to fetch messages.
 */
private FetchResponse fetchMessages(SimpleConsumer consumer, long offset) {
  FetchRequest request = new FetchRequestBuilder()
    .clientId(consumer.clientId())
    .addFetch(topicPart.getTopic(), topicPart.getPartition(), offset, FETCH_SIZE)
    .maxWait(MAX_WAIT)
    .build();
  return consumer.fetch(request);
}
 
开发者ID:apache,项目名称:twill,代码行数:12,代码来源:SimpleKafkaConsumer.java



注:本文中的kafka.api.FetchRequest类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MalformedPatternException类代码示例发布时间:2022-05-21
下一篇:
Java JFacePreferences类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap