You have a few options as you stated.
If you convert it to a topic to get the same effect you will need to make the consumers persistent consumers. One thing the queue offers is persistence if your consumer isn't alive. This will depend on the MQ system you are using.
If you want to stick with queues, you will create a queue for each consumer and a dispatcher that will listen on the original queue.
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Pros of Topics
- Easier to dynamically add new consumers. All consumers will get new messages without any work.
- You can create round-robin topics, so that Consumer_1 will get a message, then Consumer_2, then Consumer_3
- Consumers can be pushed new messages, instead of having to query a queue making them reactive.
Cons of Topics
- Messages are not persistent unless your Broker supports this configuration. If a consumer goes off line and comes back it is possible to have missed messages unless Persistent consumers are setup.
- Difficult to allow Consumer_1 and Consumer_2 to receive a message but not Consumer_3. With a Dispatcher and Queues, the Dispatcher can not put a message in Consumer_3's queue.
Pros of Queues
- Messages are persistent until a Consumer removes them
- A dispatcher can filter which consumers get which messages by not placing messages into the respective consumers queues. This can be done with topics through filters though.
Cons of Queues
- Additional Queues need to be created to support multiple consumers. In a dynamic environment this wouldn't be efficient.
When developing a Messaging System I prefer topics as it gives me the most power, but seeing as you are already using Queues it would require you to change how your system works to implement Topics instead.
Design and Implementation of Queue System with multiple consumers
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Source
Keep in mind there are other things you'll need to take care of such as problem exception handling, reconnection to the connection and queues if you lose your connection, etc. This is just designed to give you an idea of how to accomplish what I described.
In a real system I probably wouldn't exit out at the first exception. I would allow the system to continue operating the best it could and log errors. As it stands in this code if putting a message in a single consumers queue fails, the whole dispatcher will stop.
Dispatcher.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…