Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
366 views
in Technique[技术] by (71.8m points)

spring boot - Client acknowledgement mode doesn't work in activemq

I spent a long time on this problem and I can't figure it out. Maybe I'm not understanding client acknowledgment correctly?

I have a activemq consumer and I configured a client acknowledgment for it. When I check the acknowledgement mode in debug, I see that it's correctly set to 2.

I wanted to check that if I don't call message.acknowledge(), message will be resent again or it will go back to the broker.

However, it doesn't happen. Even though I don't acknowledge the message, it still shows as successfully dequeued.

What am I doing wrong?

package test.config;

import javax.jms.Session;    
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class ConnectionFactoryConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.password}")
    private String password;


    @Bean
    public PooledConnectionFactory pooledConnectionFactory() {
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(createActiveMQConnectionFactory());
        pooledConnectionFactory.setMaxConnections(2);
        return pooledConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(pooledConnectionFactory());
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return jmsTemplate;
    }

    @Bean
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(pooledConnectionFactory());
        factory.setSessionAcknowledgeMode(2);
        return factory;
    }


    private ActiveMQConnectionFactory createActiveMQConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(username);
        factory.setPassword(password);
        factory.setRedeliveryPolicy(createRedeliveryPolicy());
        return factory;
    }

    private RedeliveryPolicy createRedeliveryPolicy() {
        final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(5);
        redeliveryPolicy.setRedeliveryDelay(13000);
        return redeliveryPolicy;
    }
}

CONSUMER

package poc.consumers;

import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "queue.test", containerFactory = "myJmsListenerContainerFactory")
    public void consume(Message message) {
        System.out.println("Consumer");
    }
}

enter image description here

question from:https://stackoverflow.com/questions/65909581/client-acknowledgement-mode-doesnt-work-in-activemq

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It will only be requeued if the listener throws an exception (or you shut down the app, or otherwise close the Session).

As far as the broker knows, your consumer is still working on it.

See Session.recover() - this is the code in the container, called when the listener throws an exception...

            else if (isClientAcknowledge(session)) {
                session.recover();
            }
/** Stops message delivery in this session, and restarts message delivery
  * with the oldest unacknowledged message.
  *  
  * <P>All consumers deliver messages in a serial order.
  * Acknowledging a received message automatically acknowledges all 
  * messages that have been delivered to the client.
  *
  * <P>Restarting a session causes it to take the following actions:
  *
  * <UL>
  *   <LI>Stop message delivery
  *   <LI>Mark all messages that might have been delivered but not 
  *       acknowledged as "redelivered"
  *   <LI>Restart the delivery sequence including all unacknowledged 
  *       messages that had been previously delivered. Redelivered messages
  *       do not have to be delivered in 
  *       exactly their original delivery order.
  * </UL>
  *
  * @exception JMSException if the JMS provider fails to stop and restart
  *                         message delivery due to some internal error.
  * @exception IllegalStateException if the method is called by a 
  *                         transacted session.
  */ 

void
recover() throws JMSException;

For other readers - if you use AUTO_ACKNOWLEDGE with a DefaultMessageListenerContainer you must use transactions to roll back the ack.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...