Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
785 views
in Technique[技术] by (71.8m points)

spring-kafka Connection to node -1 (/192.168.xx.xx:9092) could not be established. Broker may not be available

my Spring boot 2.4.1 run in localhost (192.168.189.115), kafka 2.13-2.6.0 run in 192.168.48.54:9092

I can post message to producer kafka with http://localhost:8010/kafka/publish?message=HelloKafka success.

but consumer got error Connection to node -1 (/192.168.48.54:9092) could not be established. Broker may not be available.

I try to change server.properties

listeners=PLAINTEXT://192.168.48.54:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

or (also commented it both)

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.48.54:9092

application.properties

server.port=8010
spring.kafka.bootstrap-servers=192.168.48.54:9092
spring.kafka.consumer.group-id=fm-group

KafKaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    
    @Autowired
    private Producer producer;

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.produce(message);
    }
    
}

Producer.java

@Service
public class Producer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Producer.class);
    private static String TOPIC = "customer.topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);
        this.kafkaTemplate.send(TOPIC, data);
    }
    
}

Consumer.java

@Service
public class Consumer {
    
    private static final Logger logger = LogManager.getFormatterLogger(Consumer.class);

    @KafkaListener(topics = "customer.topic", groupId = "fm-group")
    public void consume(String message) throws IOException {
        logger.info("Consume Message: %s", message);
    }
    
}

in kafka server i can ping my ip (192.168.189.115). I don't know why consumer could not be established. I try all resolution in stackoverflow already. Please help me.

EDIT#1 I changed Producer.java to

public void produce(String data) {
        logger.info("Produce Topic: %s - Message: %s", TOPIC, data);              
        try {
            ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(TOPIC, data);
            logger.info("test");
            SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS);
            logger.info("sendResult ", sendResult.getRecordMetadata());
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

i got message reply

org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic customer.topic not present in metadata after 60000 ms.

it appear that i can't send message too. Why i can't connect ? please help me

i use ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Topic: customer.topic   PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: customer.topic   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: streams-wordcount-output PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,segment.bytes=1073741824

EDIT#2 i also put this in pom.xml as suggestd in org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms

but it still doesn' work

<dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
    </dependency>

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You shouldn't bind to an actual IP as that'll restrict traffic to only that address

This opens up the server to accept all incoming connections on port 9092

listeners=PLAINTEXT://0.0.0.0:9092

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...