Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
200 views
in Technique[技术] by (71.8m points)

java - How to run multiple kafka consumers on the same box independent of each other?

I have two Kafka consumer ConsumerA and ConsumerB. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.

  • Each consumer should have a different Properties object.
  • Each consumer should have a different thread pool configuration since they can be run in multithreaded way (consumer group) if needed independent of other consumer.

Below is my design:

Consumer class (abstract):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

ConsumerA class:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }

    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        try {
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s
",
                                      record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ", ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ", ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

ConsumerA B class:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler class:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

Below is my main class in one of my project where if I start my server, calls will come first automatically and from this place I start my all kafka consumers where I execute my ConsumerA and ConsumerB. And as soon as shutdown is called, I release all the resources by calling shutdown on all my Kafka consumers.

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;

  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

Is this the right design for this kind of problem where I want to run multiple kafka consumers on the same box? Let me know if there is any better and efficient way to solve this problem. In general I will be running three or four Kafka consumers max on the same box and each consumer can have their own consumer group if needed.

Here is the Javadoc for KafkaConsumer which I am using in both my consumer. And basis on this article I have created my consumer, it is just that I have used abstract class to extend it. Search for "Putting it all Together" in that link.

In the docs it is mentioned that Consumers are not Thread-Safe but it looks like my code is reusing the same consumer instance for each thread in the pool.

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

What is the best way to solve this thread safety issue and still achieve the same features?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

A quick suggestion, apologies if you know about it already. Class level variables are never thread safe. If you need to have a different Properties object for every thread, better declare them at the method level and provide them as parameter to other methods where you need to access Properties object.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...