Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
561 views
in Technique[技术] by (71.8m points)

rabbitmq - Spring rabbit retries to deliver rejected message..is it OK?

I have the following configuration

spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=3
spring.rabbitmq.listener.retry.max-interval=1000
spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens

and in my listener I throw the exception AmqpRejectAndDontRequeueException to reject the message and enforce rabbit not to try to redeliver it...But rabbit redilvers it for 3 times then finally route it to dead letter queue.

Is that the standard behavior according to my provided configuration or do I miss something?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You have to configure the retry policy to not retry for that exception.

You can't do that with properties, you have to configure the retry advice yourself.

I'll post an example later if you need help with that.

requeue-rejected is at the container level (below retry on the stack).

EDIT

@SpringBootApplication
public class So39853762Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39853762Application.class, args);
        Thread.sleep(60000);
        context.close();
    }

    @RabbitListener(queues = "foo")
    public void foo(String foo) {
        System.out.println(foo);
        if ("foo".equals(foo)) {
            throw new AmqpRejectAndDontRequeueException("foo"); // won't be retried.
        }
        else {
            throw new IllegalStateException("bar"); // will be retried
        }
    }

    @Bean
    public ListenerRetryAdviceCustomizer retryCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
            RabbitProperties rabbitPropeties) {
        return new ListenerRetryAdviceCustomizer(containerFactory, rabbitPropeties);
    }

    public static class ListenerRetryAdviceCustomizer implements InitializingBean {

        private final SimpleRabbitListenerContainerFactory containerFactory;

        private final RabbitProperties rabbitPropeties;

        public ListenerRetryAdviceCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
                RabbitProperties rabbitPropeties) {
            this.containerFactory = containerFactory;
            this.rabbitPropeties = rabbitPropeties;
        }

        @Override
        public void afterPropertiesSet() throws Exception {
            ListenerRetry retryConfig = this.rabbitPropeties.getListener().getRetry();
            if (retryConfig.isEnabled()) {
                RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                        ? RetryInterceptorBuilder.stateless()
                        : RetryInterceptorBuilder.stateful());
                Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
                retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false);
                retryableExceptions.put(IllegalStateException.class, true);
                SimpleRetryPolicy policy =
                        new SimpleRetryPolicy(retryConfig.getMaxAttempts(), retryableExceptions, true);
                ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
                backOff.setInitialInterval(retryConfig.getInitialInterval());
                backOff.setMultiplier(retryConfig.getMultiplier());
                backOff.setMaxInterval(retryConfig.getMaxInterval());
                builder.retryPolicy(policy)
                    .backOffPolicy(backOff)
                    .recoverer(new RejectAndDontRequeueRecoverer());
                this.containerFactory.setAdviceChain(builder.build());
            }
        }

    }

}

NOTE: You cannot currently configure the policy to retry all exceptions, "except" this one - you have to classify all exceptions you want retried (and they can't be a superclass of AmqpRejectAndDontRequeueException). I have opened an issue to support this.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...