Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
234 views
in Technique[技术] by (71.8m points)

java - Kafka 0.8.2.2 - Unable to publish messages

We have written a java client for publishing message to kafka. The code is as shown below

Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "202.xx.xx.xxx:9092");
props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,Integer.toString(5 * 1000));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//1. create KafkaProducer
KafkaProducer producer = new KafkaProducer(props);

//2 create callback
Callback callback = new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
    System.out.println("Error while sending data");
    if (e != null);
    e.printStackTrace();
}
};
producer.send(record, callback);

When we execute this code , we get the following message and exception

    ProducerConfig values: 
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 5000
acks = 1
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [202.xx.xx.xx:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id = 

Updated cluster metadata version 1 to Cluster(nodes = [Node(202.xx.xx.xx, 9092)], partitions = [])
Starting Kafka producer I/O thread.
The configuration metadata.broker.list = null was supplied but isn't a known config.
The configuration request.required.acks = null was supplied but isn't a known config.
Kafka producer started
Trying to send metadata request to node -1
Init connection to node -1 for sending metadata request in the next iteration
Initiating connection to node -1 at 202.xx.xx.xx:9092.
Trying to send metadata request to node -1
Completed connection to node -1
Trying to send metadata request to node -1
Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header=           {api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[HelloWorld]})) to node -1
Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 192.local, 9092)], partitions = [Partition(topic = HelloWorld, partition = 0, leader = 0,            replicas = [0,], isr = [0,]])

Initiating connection to node 0 at 192.local:9092.
0  max latency = 219 ms, avg latency = 0.00022
1 records sent in 219 ms ms. 4.57 records per second (0.00 mb/sec).Error connecting to node 0 at 192.local:9092:
java.io.IOException: Can't resolve address: 192.local:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:138)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:417)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:116)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:165)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Unknown Source)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at org.apache.kafka.common.network.Selector.connect(Selector.java:135)
... 5 more
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
Initiating connection to node 0 at 192.local:9092.
Error connecting to node 0 at 192.local:9092:
java.io.IOException: Can't resolve address: 192.local:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:138)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:417)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:116)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:165)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Unknown Source)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at org.apache.kafka.common.network.Selector.connect(Selector.java:135)
... 5 more
Give up sending metadata request since no node is available

This happens in a infinite loop and the application hangs... When we checked the kafka broker , we found that the topic was created... but we did not get the message... We have been stuck on this for a while... Please help

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

We finally figured out the issue... We were running kafka in a hybrid evironment as mentioned in the following post -

https://medium.com/@thedude_rog/running-kafka-in-a-hybrid-cloud-environment-17a8f3cfc284

We changed the host.name to the internal IP and advertised.host.name to external IP


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...