I have two Kafka consumer ConsumerA
and ConsumerB
. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.
- Each consumer should have a different Properties object.
- Each consumer should have a different thread pool configuration since they can be run in multithreaded way (consumer group) if needed independent of other consumer.
Below is my design:
Consumer class (abstract):
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
ConsumerA class:
public class ConsumerA extends Consumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer<byte[], byte[]> consumer;
public ConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
Map<String, Object> config = new HashMap<>();
config.put(Config.URLS, TEST_URL);
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord payload = decoder.decode(record.value());
// extract data from payload
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s
",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (WakeupException ex) {
// Ignore exception if closing
System.out.println("error= ", ex);
if (!closed.get()) throw e;
} catch (Exception ex) {
System.out.println("error= ", ex);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}
ConsumerA B class:
// similar to `ConsumerA` but with specific details of B
ConsumerHandler class:
public final class ConsumerHandler {
private final ExecutorService executorServiceConsumer;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
this.consumers.add(consumer);
executorServiceConsumer.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceConsumer.shutdown();
try {
executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
Below is my main class in one of my project where if I start my server, calls will come first automatically and from this place I start my all kafka consumers where I execute my ConsumerA
and ConsumerB
. And as soon as shutdown is called, I release all the resources by calling shutdown on all my Kafka consumers.
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
@Singleton
@DependencyInjectionInitializer
public class Initializer {
private ConsumerHandler consumerHandlerA;
private ConsumerHandler consumerHandlerB;
@PostConstruct
public void init() {
consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
}
@PreDestroy
public void shutdown() {
consumerHandlerA.shutdown();
consumerHandlerB.shutdown();
}
}
Is this the right design for this kind of problem where I want to run multiple kafka consumers on the same box? Let me know if there is any better and efficient way to solve this problem. In general I will be running three or four Kafka consumers max on the same box and each consumer can have their own consumer group if needed.
Here is the Javadoc for KafkaConsumer which I am using in both my consumer. And basis on this article I have created my consumer, it is just that I have used abstract class to extend it. Search for "Putting it all Together" in that link.
In the docs it is mentioned that Consumers are not Thread-Safe but it looks like my code is reusing the same consumer instance for each thread in the pool.
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
this.consumers.add(consumer);
executorServiceConsumer.submit(consumer);
}
}
What is the best way to solve this thread safety issue and still achieve the same features?
See Question&Answers more detail:
os