my Spring boot 2.4.1 run in localhost (192.168.189.115), kafka 2.13-2.6.0 run in 192.168.48.54:9092
I can post message to producer kafka with http://localhost:8010/kafka/publish?message=HelloKafka success.
but consumer got error Connection to node -1 (/192.168.48.54:9092) could not be established. Broker may not be available.
I try to change server.properties
listeners=PLAINTEXT://192.168.48.54:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092
or (also commented it both)
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092
application.properties
server.port=8010
spring.kafka.bootstrap-servers=192.168.48.54:9092
spring.kafka.consumer.group-id=fm-group
KafKaController.java
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private Producer producer;
@GetMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.produce(message);
}
}
Producer.java
@Service
public class Producer {
private static final Logger logger = LogManager.getFormatterLogger(Producer.class);
private static String TOPIC = "customer.topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void produce(String data) {
logger.info("Produce Topic: %s - Message: %s", TOPIC, data);
this.kafkaTemplate.send(TOPIC, data);
}
}
Consumer.java
@Service
public class Consumer {
private static final Logger logger = LogManager.getFormatterLogger(Consumer.class);
@KafkaListener(topics = "customer.topic", groupId = "fm-group")
public void consume(String message) throws IOException {
logger.info("Consume Message: %s", message);
}
}
in kafka server i can ping my ip (192.168.189.115). I don't know why consumer could not be established.
I try all resolution in stackoverflow already. Please help me.
EDIT#1 I changed Producer.java to
public void produce(String data) {
logger.info("Produce Topic: %s - Message: %s", TOPIC, data);
try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(TOPIC, data);
logger.info("test");
SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS);
logger.info("sendResult ", sendResult.getRecordMetadata());
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
i got message reply
org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.
it appear that i can't send message too. Why i can't connect ? please help me
i use ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: customer.topic PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: customer.topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: streams-wordcount-output PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact,segment.bytes=1073741824
EDIT#2 i also put this in pom.xml as suggestd in
org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms
but it still doesn' work
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>