Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
461 views
in Technique[技术] by (71.8m points)

concurrency - How do you process messages in parallel while ensuring FIFO per entity?

Let's say you have an entity, say, "Person" in your system and you want to process events that modify various Person entities. It is important that:

  • Events for the same Person are processed in FIFO order
  • Multiple Person event streams be processed in parallel by different threads/processes

We have an implementation that solves this using a shared database and locks. Threads compete to acquire the lock for a Person and then process events in order after acquiring the lock. We'd like to move to a message queue to avoid polling and locking, which we feel would reduce load on the DB and simplify the implementation of the consumer code.

I've done some research into ActiveMQ, RabbitMQ, and HornetQ but I don't see an obvious way to implement this.

ActiveMQ supports consumer subscription wildcards, but I don't see a way to limit the concurrency on each queue to 1. If I could do that, then the solution would be straightforward:

  • Somehow tell broker to allow a concurrency of 1 for all queues starting with: /queue/person.
  • Publisher writes event to queue using Person ID in the queue name. e.g.: /queue/person.20
  • Consumers subscribe to the queue using wildcards: /queue/person.>
  • Each consumer would receive messages for different person queues. If all person queues were in use, some consumers may sit idle, which is ok
  • After processing a message, the consumer sends an ACK, which tells the broker it's done with the message, and allows another message for that Person queue to be sent to another consumer (possibly the same one)

ActiveMQ came close: You can do wildcard subscriptions and enable "exclusive consumer", but that combination results in a single consumer receiving all messages sent to all matching queues, reducing your concurrency to 1 across all Persons. I feel like I'm missing something obvious.

Questions:

  • Is there way to implement the above approach with any major message queue implementation? We are fairly open to options. The only requirement is that it run on Linux.
  • Is there a different way to solve the general problem that I'm not considering?

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It looks like JMSXGroupID is what I'm looking for. From the ActiveMQ docs:

http://activemq.apache.org/message-groups.html

Their example use case with stock prices is exactly what I'm after. My only concern is what happens if the single consumer dies. Hopefully the broker will detect that and pick another consumer to associate with that group id.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...