• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Node类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.Node的典型用法代码示例。如果您正苦于以下问题:Java Node类的具体用法?Java Node怎么用?Java Node使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Node类属于org.apache.kafka.common包,在下文中一共展示了Node类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: fillInMissingPartitions

import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
 * Add empty load of all the partitions that exists in the current cluster but missing from the
 * metric aggregation result.
 */
private void fillInMissingPartitions(Map<TopicPartition, Snapshot[]> loadSnapshots,
                                            Cluster kafkaCluster,
                                            ClusterModel clusterModel) throws ModelInputException {
  // There must be at least one entry, otherwise there will be exception thrown earlier. So we don't need to
  // check if it has next
  Snapshot[] snapshotsForTimestamps = loadSnapshots.values().iterator().next();
  Snapshot[] emptyLoadSnapshots = new Snapshot[snapshotsForTimestamps.length];
  for (int i = 0; i < emptyLoadSnapshots.length; i++) {
    emptyLoadSnapshots[i] = new Snapshot(snapshotsForTimestamps[i].time());
  }
  for (Node node : kafkaCluster.nodes()) {
    for (PartitionInfo partitionInfo : kafkaCluster.partitionsForNode(node.id())) {
      TopicPartition tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
      if (!loadSnapshots.containsKey(tp)) {
        populateSnapshots(kafkaCluster, clusterModel, tp, emptyLoadSnapshots);
      }
    }
  }
}
 
开发者ID:linkedin,项目名称:cruise-control,代码行数:24,代码来源:LoadMonitor.java


示例2: testResetWhenOutOfOrderSequenceReceived

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
    final long producerId = 343434L;
    TransactionManager transactionManager = new TransactionManager();
    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
    setupWithTransactionState(transactionManager);
    client.setNode(new Node(1, "localhost", 33343));

    int maxRetries = 10;
    Metrics m = new Metrics();
    Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
            m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    sender.run(time.milliseconds());  // connect.
    sender.run(time.milliseconds());  // send.

    assertEquals(1, client.inFlightRequestCount());

    client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));

    sender.run(time.milliseconds());
    assertTrue(responseFuture.isDone());
    assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:SenderTest.java


示例3: populateSnapshots

import org.apache.kafka.common.Node; //导入依赖的package包/类
private void populateSnapshots(Cluster kafkaCluster,
                               ClusterModel clusterModel,
                               TopicPartition tp,
                               Snapshot[] leaderLoadSnapshots) throws ModelInputException {
  PartitionInfo partitionInfo = kafkaCluster.partition(tp);
  // If partition info does not exist, the topic may have been deleted.
  if (partitionInfo != null) {
    for (Node replica : partitionInfo.replicas()) {
      boolean isLeader = partitionInfo.leader() != null && replica.id() == partitionInfo.leader().id();
      String rack = getRackHandleNull(replica);
      // Note that we assume the capacity resolver can still return the broker capacity even if the broker
      // is dead. We need this to get the host resource capacity.
      Map<Resource, Double> brokerCapacity =
          _brokerCapacityConfigResolver.capacityForBroker(rack, replica.host(), replica.id());
      clusterModel.createReplicaHandleDeadBroker(rack, replica.id(), tp, isLeader, brokerCapacity);
      // Push the load snapshot to the replica one by one.
      for (int i = 0; i < leaderLoadSnapshots.length; i++) {
        clusterModel.pushLatestSnapshot(rack, replica.id(), tp,
                                        isLeader ? leaderLoadSnapshots[i].duplicate() : MonitorUtils.toFollowerSnapshot(leaderLoadSnapshots[i]));
      }
    }
  }
}
 
开发者ID:linkedin,项目名称:cruise-control,代码行数:24,代码来源:LoadMonitor.java


示例4: isReplicaMovementDone

import org.apache.kafka.common.Node; //导入依赖的package包/类
private boolean isReplicaMovementDone(Cluster cluster, TopicPartition tp, ExecutionTask task) {
  boolean destinationExists = false;
  boolean sourceExists = false;
  for (Node node : cluster.partition(tp).replicas()) {
    destinationExists = destinationExists || (node.id() == task.destinationBrokerId());
    sourceExists = sourceExists || (node.id() == task.sourceBrokerId());
  }
  switch (task.state()) {
    case IN_PROGRESS:
      return destinationExists && !sourceExists;
    case ABORTING:
      return !destinationExists && sourceExists;
    case DEAD:
      return !destinationExists && !sourceExists;
    default:
      throw new IllegalStateException("Should never be here. State " + task.state());
  }
}
 
开发者ID:linkedin,项目名称:cruise-control,代码行数:19,代码来源:Executor.java


示例5: getMetadata

import org.apache.kafka.common.Node; //导入依赖的package包/类
private Metadata getMetadata(Collection<TopicPartition> partitions) {
  Node node0 = new Node(0, "localhost", 100, "rack0");
  Node node1 = new Node(1, "localhost", 100, "rack1");
  Node[] nodes = {node0, node1};
  Set<Node> allNodes = new HashSet<>();
  allNodes.add(node0);
  allNodes.add(node1);
  Set<PartitionInfo> parts = new HashSet<>();
  for (TopicPartition tp : partitions) {
    parts.add(new PartitionInfo(tp.topic(), tp.partition(), node0, nodes, nodes));
  }
  Cluster cluster = new Cluster("clusterId", allNodes, parts, Collections.emptySet(), Collections.emptySet());
  Metadata metadata = new Metadata();
  metadata.update(cluster, Collections.emptySet(), 0);
  return metadata;
}
 
开发者ID:linkedin,项目名称:cruise-control,代码行数:17,代码来源:MetricSampleAggregatorTest.java


示例6: testListenerGetsNotifiedOfUpdate

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testListenerGetsNotifiedOfUpdate() {
    long time = 0;
    final Set<String> topics = new HashSet<>();
    metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
    metadata.addListener(new Metadata.Listener() {
        @Override
        public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
            topics.clear();
            topics.addAll(cluster.topics());
        }
    });

    metadata.update(new Cluster(
            null,
            Arrays.asList(new Node(0, "host1", 1000)),
            Arrays.asList(
                new PartitionInfo("topic", 0, null, null, null),
                new PartitionInfo("topic1", 0, null, null, null)),
            Collections.<String>emptySet(),
            Collections.<String>emptySet()),
        Collections.<String>emptySet(), 100);

    assertEquals("Listener did not update topics list correctly",
        new HashSet<>(Arrays.asList("topic", "topic1")), topics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:MetadataTest.java


示例7: createLeaderAndIsrRequest

import org.apache.kafka.common.Node; //导入依赖的package包/类
private LeaderAndIsrRequest createLeaderAndIsrRequest() {
    Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
    List<Integer> isr = asList(1, 2);
    List<Integer> replicas = asList(1, 2, 3, 4);
    partitionStates.put(new TopicPartition("topic5", 105),
            new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas));
    partitionStates.put(new TopicPartition("topic5", 1),
            new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas));
    partitionStates.put(new TopicPartition("topic20", 1),
            new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas));

    Set<Node> leaders = Utils.mkSet(
            new Node(0, "test0", 1223),
            new Node(1, "test1", 1223)
    );

    return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:RequestResponseTest.java


示例8: shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
    final String changelogName = "test-application-my-store-changelog";
    final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
    consumer.assign(partitions);
    final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
    committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
    consumer.commitSync(committedOffsets);

    restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
            new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream("topic").groupByKey().count("my-store");
    final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
    StreamsConfig config = createConfig(baseDir);

    new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,
        new MockStreamsMetrics(new Metrics()), stateDirectory);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:StandbyTaskTest.java


示例9: testTimeoutWithoutMetadata

import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
 * Test that the client properly times out when we don't receive any metadata.
 */
@Test
public void testTimeoutWithoutMetadata() throws Exception {
    try (MockKafkaAdminClientEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
        env.kafkaClient().setNode(new Node(0, "localhost", 8121));
        env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
        KafkaFuture<Void> future = env.adminClient().createTopics(
                Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                new CreateTopicsOptions().timeoutMs(1000)).all();
        assertFutureError(future, TimeoutException.class);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:KafkaAdminClientTest.java


示例10: testRaiseErrorWhenNoPartitionsPendingOnDrain

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
    final long pid = 13131L;
    final short epoch = 1;
    doInitTransactions(pid, epoch);
    transactionManager.beginTransaction();
    // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
    accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
            "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
    Node node1 = new Node(0, "localhost", 1111);
    PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);

    Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
            Collections.<String>emptySet(), Collections.<String>emptySet());
    Set<Node> nodes = new HashSet<>();
    nodes.add(node1);
    Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
            time.milliseconds());

    // We shouldn't drain batches which haven't been added to the transaction yet.
    assertTrue(drainedBatches.containsKey(node1.id()));
    assertTrue(drainedBatches.get(node1.id()).isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:TransactionManagerTest.java


示例11: awaitReady

import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
 * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
 * invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.
 *
 * It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
 * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
 * connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
 * has recently disconnected.
 *
 * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
 * care.
 */
public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
    if (timeoutMs < 0) {
        throw new IllegalArgumentException("Timeout needs to be greater than 0");
    }
    long startTime = time.milliseconds();
    long expiryTime = startTime + timeoutMs;

    if (isReady(client, node, startTime) ||  client.ready(node, startTime))
        return true;

    long attemptStartTime = time.milliseconds();
    while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) {
        if (client.connectionFailed(node)) {
            throw new IOException("Connection to " + node + " failed.");
        }
        long pollTimeout = expiryTime - attemptStartTime;
        client.poll(pollTimeout, attemptStartTime);
        attemptStartTime = time.milliseconds();
    }
    return client.isReady(node, attemptStartTime);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:NetworkClientUtils.java


示例12: maybeUpdate

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Override
// 用来判断当前metadata中保存的集群元数据是否需要更新
public long maybeUpdate(long now) {
    // should we update our metadata?
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
    long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;

    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
    if (metadataTimeout > 0) {
        return metadataTimeout;
    }

    // Beware that the behavior of this method and the computation of timeouts for poll() are
    // highly dependent on the behavior of leastLoadedNode.
    Node node = leastLoadedNode(now);
    if (node == null) {
        log.debug("Give up sending metadata request since no node is available");
        return reconnectBackoffMs;
    }

    return maybeUpdate(now, node);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:NetworkClient.java


示例13: getBrokerLeaderPartitions

import org.apache.kafka.common.Node; //导入依赖的package包/类
public Map<Integer, List<TopicPartition>> getBrokerLeaderPartitions(
    Map<String, List<PartitionInfo>> topicPartitonInfoMap) {
  Map<Integer, List<TopicPartition>> result = new HashMap<>();

  for (String topic : topicPartitonInfoMap.keySet()) {
    List<PartitionInfo> partitionInfoList = topicPartitonInfoMap.get(topic);
    if (partitionInfoList == null) {
      LOG.error("Failed to get partition info for {}", topic);
      continue;
    }

    for (PartitionInfo info : partitionInfoList) {
      Node leaderNode = info.leader();
      if (leaderNode != null) {
        result.putIfAbsent(leaderNode.id(), new ArrayList<>());
        TopicPartition topicPartiton = new TopicPartition(info.topic(), info.partition());
        result.get(leaderNode.id()).add(topicPartiton);
      }
    }
  }
  return result;
}
 
开发者ID:pinterest,项目名称:doctorkafka,代码行数:23,代码来源:KafkaClusterManager.java


示例14: sendOffsetFetchRequest

import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
 * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
 * returned future can be polled to get the actual offsets returned from the broker.
 *
 * @param partitions The set of partitions to get offsets for.
 * @return A request future containing the committed offsets.
 */
// 创建并缓存OffsetFethcRequest
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
    Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
    // construct the request
    OffsetFetchRequest.Builder requestBuilder =
            new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions));

    // send the request with a callback
    // 使用OffsetFetchResponseHandler来处理响应
    return client.send(coordinator, requestBuilder)
            .compose(new OffsetFetchResponseHandler());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:ConsumerCoordinator.java


示例15: testAbortRetryWhenProducerIdChanges

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
    final long producerId = 343434L;
    TransactionManager transactionManager = new TransactionManager();
    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
    setupWithTransactionState(transactionManager);
    client.setNode(new Node(1, "localhost", 33343));

    int maxRetries = 10;
    Metrics m = new Metrics();
    Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
            m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);

    Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
    sender.run(time.milliseconds());  // connect.
    sender.run(time.milliseconds());  // send.
    String id = client.requests().peek().destination();
    Node node = new Node(Integer.valueOf(id), "localhost", 0);
    assertEquals(1, client.inFlightRequestCount());
    assertTrue("Client ready status should be true", client.isReady(node, 0L));
    client.disconnect(id);
    assertEquals(0, client.inFlightRequestCount());
    assertFalse("Client ready status should be false", client.isReady(node, 0L));

    transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
    sender.run(time.milliseconds()); // receive error
    sender.run(time.milliseconds()); // reconnect
    sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
    assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());

    KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
    assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);

    assertTrue(responseFuture.isDone());
    assertEquals((long) transactionManager.sequenceNumber(tp0), 0L);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:37,代码来源:SenderTest.java


示例16: FindCoordinatorResponse

import org.apache.kafka.common.Node; //导入依赖的package包/类
public FindCoordinatorResponse(Struct struct) {
    this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
    error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
    if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
        errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
    else
        errorMessage = null;

    Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
    int nodeId = broker.getInt(NODE_ID_KEY_NAME);
    String host = broker.getString(HOST_KEY_NAME);
    int port = broker.getInt(PORT_KEY_NAME);
    node = new Node(nodeId, host, port);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:FindCoordinatorResponse.java


示例17: testPartitioner

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
    PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
    Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1),
            Collections.<String>emptySet(), Collections.<String>emptySet());
    MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
    Future<RecordMetadata> metadata = producer.send(record);
    assertEquals("Partition should be correct", 1, metadata.get().partition());
    producer.clear();
    assertEquals("Clear should erase our history", 0, producer.history().size());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProducerTest.java


示例18: getCluster

import org.apache.kafka.common.Node; //导入依赖的package包/类
private Cluster getCluster() {
  Node node0 = new Node(BROKER_ID_0, "localhost", 100, "rack0");
  Node node1 = new Node(BROKER_ID_1, "localhost", 100, "rack1");
  Node[] nodes = {node0, node1};
  Set<Node> allNodes = new HashSet<>();
  allNodes.add(node0);
  allNodes.add(node1);
  Set<PartitionInfo> parts = new HashSet<>();
  parts.add(new PartitionInfo(TOPIC1, P0, node0, nodes, nodes));
  parts.add(new PartitionInfo(TOPIC1, P1, node1, nodes, nodes));
  parts.add(new PartitionInfo(TOPIC2, P0, node0, nodes, nodes));
  parts.add(new PartitionInfo(TOPIC2, P1, node0, nodes, nodes));
  return new Cluster("testCluster", allNodes, parts, Collections.emptySet(), Collections.emptySet());
}
 
开发者ID:linkedin,项目名称:cruise-control,代码行数:15,代码来源:CruiseControlMetricsProcessorTest.java


示例19: respondFrom

import org.apache.kafka.common.Node; //导入依赖的package包/类
public void respondFrom(AbstractResponse response, Node node, boolean disconnected) {
    Iterator<ClientRequest> iterator = requests.iterator();
    while (iterator.hasNext()) {
        ClientRequest request = iterator.next();
        if (request.destination().equals(node.idString())) {
            iterator.remove();
            short version = request.requestBuilder().desiredOrLatestVersion();
            responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                    request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
            return;
        }
    }
    throw new IllegalArgumentException("No requests available to node " + node);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:MockClient.java


示例20: testSendInOrder

import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testSendInOrder() throws Exception {
    int maxRetries = 1;
    Metrics m = new Metrics();
    try {
        Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
        // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
        Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
        metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());

        // Send the first message.
        TopicPartition tp2 = new TopicPartition("test", 1);
        accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
        sender.run(time.milliseconds()); // connect
        sender.run(time.milliseconds()); // send produce request
        String id = client.requests().peek().destination();
        assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
        Node node = new Node(Integer.parseInt(id), "localhost", 0);
        assertEquals(1, client.inFlightRequestCount());
        assertTrue(client.hasInFlightRequests());
        assertTrue("Client ready status should be true", client.isReady(node, 0L));

        time.sleep(900);
        // Now send another message to tp2
        accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);

        // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
        Cluster cluster2 = TestUtils.singletonCluster("test", 2);
        metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds());
        // Sender should not send the second message to node 0.
        sender.run(time.milliseconds());
        assertEquals(1, client.inFlightRequestCount());
        assertTrue(client.hasInFlightRequests());
    } finally {
        m.close();
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:SenderTest.java



注:本文中的org.apache.kafka.common.Node类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java RemoteMediaClient类代码示例发布时间:2022-05-21
下一篇:
Java ModelChannelMixer类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap