本文整理汇总了Java中org.apache.kafka.common.Node类的典型用法代码示例。如果您正苦于以下问题:Java Node类的具体用法?Java Node怎么用?Java Node使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Node类属于org.apache.kafka.common包,在下文中一共展示了Node类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: fillInMissingPartitions
import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
* Add empty load of all the partitions that exists in the current cluster but missing from the
* metric aggregation result.
*/
private void fillInMissingPartitions(Map<TopicPartition, Snapshot[]> loadSnapshots,
Cluster kafkaCluster,
ClusterModel clusterModel) throws ModelInputException {
// There must be at least one entry, otherwise there will be exception thrown earlier. So we don't need to
// check if it has next
Snapshot[] snapshotsForTimestamps = loadSnapshots.values().iterator().next();
Snapshot[] emptyLoadSnapshots = new Snapshot[snapshotsForTimestamps.length];
for (int i = 0; i < emptyLoadSnapshots.length; i++) {
emptyLoadSnapshots[i] = new Snapshot(snapshotsForTimestamps[i].time());
}
for (Node node : kafkaCluster.nodes()) {
for (PartitionInfo partitionInfo : kafkaCluster.partitionsForNode(node.id())) {
TopicPartition tp = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!loadSnapshots.containsKey(tp)) {
populateSnapshots(kafkaCluster, clusterModel, tp, emptyLoadSnapshots);
}
}
}
}
开发者ID:linkedin,项目名称:cruise-control,代码行数:24,代码来源:LoadMonitor.java
示例2: testResetWhenOutOfOrderSequenceReceived
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect.
sender.run(time.milliseconds()); // send.
assertEquals(1, client.inFlightRequestCount());
client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:SenderTest.java
示例3: populateSnapshots
import org.apache.kafka.common.Node; //导入依赖的package包/类
private void populateSnapshots(Cluster kafkaCluster,
ClusterModel clusterModel,
TopicPartition tp,
Snapshot[] leaderLoadSnapshots) throws ModelInputException {
PartitionInfo partitionInfo = kafkaCluster.partition(tp);
// If partition info does not exist, the topic may have been deleted.
if (partitionInfo != null) {
for (Node replica : partitionInfo.replicas()) {
boolean isLeader = partitionInfo.leader() != null && replica.id() == partitionInfo.leader().id();
String rack = getRackHandleNull(replica);
// Note that we assume the capacity resolver can still return the broker capacity even if the broker
// is dead. We need this to get the host resource capacity.
Map<Resource, Double> brokerCapacity =
_brokerCapacityConfigResolver.capacityForBroker(rack, replica.host(), replica.id());
clusterModel.createReplicaHandleDeadBroker(rack, replica.id(), tp, isLeader, brokerCapacity);
// Push the load snapshot to the replica one by one.
for (int i = 0; i < leaderLoadSnapshots.length; i++) {
clusterModel.pushLatestSnapshot(rack, replica.id(), tp,
isLeader ? leaderLoadSnapshots[i].duplicate() : MonitorUtils.toFollowerSnapshot(leaderLoadSnapshots[i]));
}
}
}
}
开发者ID:linkedin,项目名称:cruise-control,代码行数:24,代码来源:LoadMonitor.java
示例4: isReplicaMovementDone
import org.apache.kafka.common.Node; //导入依赖的package包/类
private boolean isReplicaMovementDone(Cluster cluster, TopicPartition tp, ExecutionTask task) {
boolean destinationExists = false;
boolean sourceExists = false;
for (Node node : cluster.partition(tp).replicas()) {
destinationExists = destinationExists || (node.id() == task.destinationBrokerId());
sourceExists = sourceExists || (node.id() == task.sourceBrokerId());
}
switch (task.state()) {
case IN_PROGRESS:
return destinationExists && !sourceExists;
case ABORTING:
return !destinationExists && sourceExists;
case DEAD:
return !destinationExists && !sourceExists;
default:
throw new IllegalStateException("Should never be here. State " + task.state());
}
}
开发者ID:linkedin,项目名称:cruise-control,代码行数:19,代码来源:Executor.java
示例5: getMetadata
import org.apache.kafka.common.Node; //导入依赖的package包/类
private Metadata getMetadata(Collection<TopicPartition> partitions) {
Node node0 = new Node(0, "localhost", 100, "rack0");
Node node1 = new Node(1, "localhost", 100, "rack1");
Node[] nodes = {node0, node1};
Set<Node> allNodes = new HashSet<>();
allNodes.add(node0);
allNodes.add(node1);
Set<PartitionInfo> parts = new HashSet<>();
for (TopicPartition tp : partitions) {
parts.add(new PartitionInfo(tp.topic(), tp.partition(), node0, nodes, nodes));
}
Cluster cluster = new Cluster("clusterId", allNodes, parts, Collections.emptySet(), Collections.emptySet());
Metadata metadata = new Metadata();
metadata.update(cluster, Collections.emptySet(), 0);
return metadata;
}
开发者ID:linkedin,项目名称:cruise-control,代码行数:17,代码来源:MetricSampleAggregatorTest.java
示例6: testListenerGetsNotifiedOfUpdate
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testListenerGetsNotifiedOfUpdate() {
long time = 0;
final Set<String> topics = new HashSet<>();
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
topics.clear();
topics.addAll(cluster.topics());
}
});
metadata.update(new Cluster(
null,
Arrays.asList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet()),
Collections.<String>emptySet(), 100);
assertEquals("Listener did not update topics list correctly",
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:MetadataTest.java
示例7: createLeaderAndIsrRequest
import org.apache.kafka.common.Node; //导入依赖的package包/类
private LeaderAndIsrRequest createLeaderAndIsrRequest() {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = asList(1, 2);
List<Integer> replicas = asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas));
partitionStates.put(new TopicPartition("topic5", 1),
new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas));
partitionStates.put(new TopicPartition("topic20", 1),
new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas));
Set<Node> leaders = Utils.mkSet(
new Node(0, "test0", 1223),
new Node(1, "test1", 1223)
);
return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:RequestResponseTest.java
示例8: shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
final String changelogName = "test-application-my-store-changelog";
final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
consumer.assign(partitions);
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
consumer.commitSync(committedOffsets);
restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
final KStreamBuilder builder = new KStreamBuilder();
builder.stream("topic").groupByKey().count("my-store");
final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
StreamsConfig config = createConfig(baseDir);
new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,
new MockStreamsMetrics(new Metrics()), stateDirectory);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:StandbyTaskTest.java
示例9: testTimeoutWithoutMetadata
import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@Test
public void testTimeoutWithoutMetadata() throws Exception {
try (MockKafkaAdminClientEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNode(new Node(0, "localhost", 8121));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
new CreateTopicsOptions().timeoutMs(1000)).all();
assertFutureError(future, TimeoutException.class);
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:KafkaAdminClientTest.java
示例10: testRaiseErrorWhenNoPartitionsPendingOnDrain
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
// Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
Node node1 = new Node(0, "localhost", 1111);
PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
Collections.<String>emptySet(), Collections.<String>emptySet());
Set<Node> nodes = new HashSet<>();
nodes.add(node1);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
time.milliseconds());
// We shouldn't drain batches which haven't been added to the transaction yet.
assertTrue(drainedBatches.containsKey(node1.id()));
assertTrue(drainedBatches.get(node1.id()).isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:TransactionManagerTest.java
示例11: awaitReady
import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
* Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
* invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.
*
* It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
* an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
* connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
* has recently disconnected.
*
* This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
* care.
*/
public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout needs to be greater than 0");
}
long startTime = time.milliseconds();
long expiryTime = startTime + timeoutMs;
if (isReady(client, node, startTime) || client.ready(node, startTime))
return true;
long attemptStartTime = time.milliseconds();
while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) {
if (client.connectionFailed(node)) {
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = expiryTime - attemptStartTime;
client.poll(pollTimeout, attemptStartTime);
attemptStartTime = time.milliseconds();
}
return client.isReady(node, attemptStartTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:NetworkClientUtils.java
示例12: maybeUpdate
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Override
// 用来判断当前metadata中保存的集群元数据是否需要更新
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:NetworkClient.java
示例13: getBrokerLeaderPartitions
import org.apache.kafka.common.Node; //导入依赖的package包/类
public Map<Integer, List<TopicPartition>> getBrokerLeaderPartitions(
Map<String, List<PartitionInfo>> topicPartitonInfoMap) {
Map<Integer, List<TopicPartition>> result = new HashMap<>();
for (String topic : topicPartitonInfoMap.keySet()) {
List<PartitionInfo> partitionInfoList = topicPartitonInfoMap.get(topic);
if (partitionInfoList == null) {
LOG.error("Failed to get partition info for {}", topic);
continue;
}
for (PartitionInfo info : partitionInfoList) {
Node leaderNode = info.leader();
if (leaderNode != null) {
result.putIfAbsent(leaderNode.id(), new ArrayList<>());
TopicPartition topicPartiton = new TopicPartition(info.topic(), info.partition());
result.get(leaderNode.id()).add(topicPartiton);
}
}
}
return result;
}
开发者ID:pinterest,项目名称:doctorkafka,代码行数:23,代码来源:KafkaClusterManager.java
示例14: sendOffsetFetchRequest
import org.apache.kafka.common.Node; //导入依赖的package包/类
/**
* Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
* returned future can be polled to get the actual offsets returned from the broker.
*
* @param partitions The set of partitions to get offsets for.
* @return A request future containing the committed offsets.
*/
// 创建并缓存OffsetFethcRequest
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
// construct the request
OffsetFetchRequest.Builder requestBuilder =
new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions));
// send the request with a callback
// 使用OffsetFetchResponseHandler来处理响应
return client.send(coordinator, requestBuilder)
.compose(new OffsetFetchResponseHandler());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:ConsumerCoordinator.java
示例15: testAbortRetryWhenProducerIdChanges
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect.
sender.run(time.milliseconds()); // send.
String id = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
client.disconnect(id);
assertEquals(0, client.inFlightRequestCount());
assertFalse("Client ready status should be false", client.isReady(node, 0L));
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
sender.run(time.milliseconds()); // receive error
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
assertTrue(responseFuture.isDone());
assertEquals((long) transactionManager.sequenceNumber(tp0), 0L);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:37,代码来源:SenderTest.java
示例16: FindCoordinatorResponse
import org.apache.kafka.common.Node; //导入依赖的package包/类
public FindCoordinatorResponse(Struct struct) {
this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
else
errorMessage = null;
Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
int nodeId = broker.getInt(NODE_ID_KEY_NAME);
String host = broker.getString(HOST_KEY_NAME);
int port = broker.getInt(PORT_KEY_NAME);
node = new Node(nodeId, host, port);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:FindCoordinatorResponse.java
示例17: testPartitioner
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1),
Collections.<String>emptySet(), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
assertEquals("Partition should be correct", 1, metadata.get().partition());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProducerTest.java
示例18: getCluster
import org.apache.kafka.common.Node; //导入依赖的package包/类
private Cluster getCluster() {
Node node0 = new Node(BROKER_ID_0, "localhost", 100, "rack0");
Node node1 = new Node(BROKER_ID_1, "localhost", 100, "rack1");
Node[] nodes = {node0, node1};
Set<Node> allNodes = new HashSet<>();
allNodes.add(node0);
allNodes.add(node1);
Set<PartitionInfo> parts = new HashSet<>();
parts.add(new PartitionInfo(TOPIC1, P0, node0, nodes, nodes));
parts.add(new PartitionInfo(TOPIC1, P1, node1, nodes, nodes));
parts.add(new PartitionInfo(TOPIC2, P0, node0, nodes, nodes));
parts.add(new PartitionInfo(TOPIC2, P1, node0, nodes, nodes));
return new Cluster("testCluster", allNodes, parts, Collections.emptySet(), Collections.emptySet());
}
开发者ID:linkedin,项目名称:cruise-control,代码行数:15,代码来源:CruiseControlMetricsProcessorTest.java
示例19: respondFrom
import org.apache.kafka.common.Node; //导入依赖的package包/类
public void respondFrom(AbstractResponse response, Node node, boolean disconnected) {
Iterator<ClientRequest> iterator = requests.iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (request.destination().equals(node.idString())) {
iterator.remove();
short version = request.requestBuilder().desiredOrLatestVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
return;
}
}
throw new IllegalArgumentException("No requests available to node " + node);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:MockClient.java
示例20: testSendInOrder
import org.apache.kafka.common.Node; //导入依赖的package包/类
@Test
public void testSendInOrder() throws Exception {
int maxRetries = 1;
Metrics m = new Metrics();
try {
Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
String id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
time.sleep(900);
// Now send another message to tp2
accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
// Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
Cluster cluster2 = TestUtils.singletonCluster("test", 2);
metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds());
// Sender should not send the second message to node 0.
sender.run(time.milliseconds());
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
} finally {
m.close();
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:SenderTest.java
注:本文中的org.apache.kafka.common.Node类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论