To my knowledge, going through the source code, you can't change the topic at runtime. So you'll want to stop the current container and recreate a new one.
I'd advise against using the registry in this case and manage the container yourself, because it seems like you can't remove containers from the registry and will end up with a memory leak.
You can autowire yourself KafkaListenerContainerFactory
. This factory requires a an endpoint. I must admit, that it seemed a bit painful to me to setup the endpoint, if you just want to change the topic and have a callback called, because all available implementations use meta programming with bean and method references.
The following snipped should get you started, although it might need some more tweaking.
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
private KafkaListenerContainerFactory<?> factory;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
public void setFactory(KafkaListenerContainerFactory<?> factory) {
this.factory = factory;
}
@EventListener(classes = {ApplicationStartedEvent.class})
public void onStarted() throws InterruptedException, NoSuchMethodException {
var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
registry.stop();
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
}
private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
listenerEndpoint.setBean(this);
listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
listenerEndpoint.setTopics(topic);
return listenerEndpoint;
}
public void onMessage(String key, String value) {
System.out.println(key + ":" + value)
}
}
As a side note, you can implement KafkaListenerConfigurer, if you want access to the registry, because it's not autowireable. But again, don't use it, if you want to kill your containers, because you can't remove the references, as far as I know.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…