I was able to solve this issue by refactoring the retry attempts loop and the on_error handler. Also, I have installed and configured supervisor in the docker container to run and manage the listener process. That way if the listener program stops it will be automatically restarted by the supervisor process manager.
Updated python stomp listener script
init_listener.py
import os, json, time, datetime, stomp
_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
# The listener will listen for messages that are relevant to this specific worker
# Queue name must match the 'worker_type' in job tracker file
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
_sub_id = 1
_reconnect_attempts = 0
_max_attempts = 1000
def connect_and_subscribe(conn):
global _reconnect_attempts
_reconnect_attempts = _reconnect_attempts + 1
if _reconnect_attempts <= _max_attempts:
try:
conn.connect(_user, _password, wait=True)
print('connect_and_subscribe connecting {} to with connection id {} reconnect attempts: {}'.format(_queue, _sub_id, _reconnect_attempts), flush=True)
except Exception as e:
print('Exception on disconnect. reconnecting...')
print(e)
connect_and_subscribe(conn)
else:
conn.subscribe(destination=_queue, id=_sub_id, ack='client-individual')
_reconnect_attempts = 0
else:
print('Maximum reconnect attempts reached for this connection. reconnect attempts: {}'.format(_reconnect_attempts), flush=True)
class MqListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
self._sub_id = _sub_id
print('MqListener init')
def on_error(self, headers, body):
print('received an error "%s"' % body)
def on_message(self, headers, body):
print('received a message headers "%s"' % headers)
print('message body "%s"' % body)
message_id = headers.get('message-id')
message_data = json.loads(body)
task_name = message_data.get('task_name')
prev_status = message_data.get('previous_step_status')
if prev_status == "success":
print('CALLING DO TASK')
resp = True
else:
print('CALLING REVERT TASK')
resp = True
if (resp):
print('Ack message_id {}'.format(message_id))
self.conn.ack(message_id, self._sub_id)
else:
print('NON Ack message_id {}'.format(message_id))
self.conn.nack(message_id, self._sub_id)
print('processed message message_id {}'.format(message_id))
def on_disconnected(self):
print('disconnected! reconnecting...')
connect_and_subscribe(self.conn)
def initialize_mqlistener():
conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
conn.set_listener('', MqListener(conn))
connect_and_subscribe(conn)
# https://github.com/jasonrbriggs/stomp.py/issues/206
while True:
time.sleep(2)
if not conn.is_connected():
print('Disconnected in loop, reconnecting')
connect_and_subscribe(conn)
if __name__ == '__main__':
initialize_mqlistener()
Supervisor installation and configuration
Dockerfile
Some details removed for brevity
# Install supervisor
RUN apt-get update && apt-get install -y supervisor
# Add the supervisor configuration file
ADD supervisord.conf /etc/supervisor/conf.d/supervisord.conf
# Start supervisor with the configuration file
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
supervisor.conf
[supervisord]
nodaemon=true
logfile=/home/exampleuser/logs/supervisord.log
[program:mqutils]
command=python3 init_listener.py
directory=/home/exampleuser/mqutils
user=exampleuser
autostart=true
autorestart=true
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…