Задержка принятия следующего Jms сообщения после ошибки

Вариант реализации JmsListenerContainerFactory (на основе DefaultJmsListenerContainerFactory) для задержки принятия следующего jms сообщения, если был выброшен необработанный эксепшен:

import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpoint;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;

import javax.annotation.PostConstruct;
import javax.jms.*;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.time.temporal.ChronoUnit.MINUTES;

/**
 * Делает задержку равную afterErrorSleepDurationMinutes перед следующим приёмом
 * сообщения, если при обработке сообщения выброшено необработанное исключение
 */
@Slf4j
public class PauseOnErrorJmsListenerContainerFactory implements JmsListenerContainerFactory<MessageListenerContainer> {
    @Autowired
    private TaskScheduler taskScheduler;

    @Autowired
    private JmsMessageFormat jmsMessageFormat;

    @Setter
    private int afterErrorSleepDurationMinutes;

    @Setter
    private ConnectionFactory connectionFactory;

    @Setter
    private MessageConverter messageConverter;

    @Setter
    private PlatformTransactionManager transactionManager;

    // ---- //

    private DefaultJmsListenerContainerFactory delegate;
    private ConcurrentMap<Throwable, Message> exceptions2messages;

    @PostConstruct
    public void init() {
        delegate = new FactoryDelegate();
        delegate.setConnectionFactory(connectionFactory);
        delegate.setMessageConverter(messageConverter);
        delegate.setTransactionManager(transactionManager);
        exceptions2messages = new ConcurrentHashMap<>();
    }

    // ---- //

    @Override
    public @NonNull
    MessageListenerContainer createListenerContainer(@NonNull JmsListenerEndpoint endpoint) {
        DefaultMessageListenerContainer result =
            delegate.createListenerContainer(endpoint);

        ErrorHandler csErrorHandler = makeErrorHandler(result);

        result.setErrorHandler(csErrorHandler);
        result.setExceptionListener(csErrorHandler::handleError);

        return result;
    }

    @NonNull
    private ContainerStoppingErrorHandler makeErrorHandler(
        @NonNull DefaultMessageListenerContainer messageListenerContainer
    ) {
        return new ContainerStoppingErrorHandler(
            taskScheduler,
            messageListenerContainer,
            afterErrorSleepDurationMinutes
        );
    }

    private class FactoryDelegate extends DefaultJmsListenerContainerFactory {
        @NonNull
        @Override
        protected DefaultMessageListenerContainer createContainerInstance() {
            return new CustomMessageListenerContainer();
        }
    }

    private class CustomMessageListenerContainer extends DefaultMessageListenerContainer {
        @Override
        protected void invokeListener(@NonNull Session session, @NonNull Message message) throws JMSException {
            if (log.isInfoEnabled()) {
                String dn = super.getDestinationName();
                String messageStr = jmsMessageFormat.format(message);
                log.info("New message received from {}: {}.", dn, messageStr);
            }

            try {
                super.invokeListener(session, message);

            } catch (JMSException | RuntimeException e) {
                exceptions2messages.put(e, message);
                throw e;
            }
        }
    }

    @RequiredArgsConstructor
    public class ContainerStoppingErrorHandler implements ErrorHandler {
        private final TaskScheduler taskScheduler;
        private final DefaultMessageListenerContainer mlc;
        private final int durationAfterErrorMinutes;
        private final AtomicBoolean currentlyStopping = new AtomicBoolean(false);
        private final Set<Throwable> exceptions = ConcurrentHashMap.newKeySet();

        @Override
        public void handleError(@NonNull Throwable throwable) {
            exceptions.add(throwable);

            if (currentlyStopping.compareAndSet(false, true)) {
                mlc.stop(() -> {
                    currentlyStopping.set(false);

                    Instant startTime = calcNextStartTime();

                    for (Throwable exception : exceptions) {
                        Message message = exceptions2messages.remove(exception);
                        logStoppingDuoError(mlc, startTime, exception, message);
                    }
                    exceptions.clear();

                    taskScheduler.schedule(mlc::start, startTime);
                });
            }
        }

        private @NonNull
        Instant calcNextStartTime() {
            return Instant.now().plus(durationAfterErrorMinutes, MINUTES);
        }

        private void logStoppingDuoError(@NonNull DefaultMessageListenerContainer mlc,
                                         @NonNull Instant startTime,
                                         @NonNull Throwable throwable,
                                         @Nullable Message inboundMessage) {
            if (!log.isErrorEnabled()) {
                return;
            }

            ZoneId zone = ZoneId.systemDefault();
            ZonedDateTime startTimeLdt = ZonedDateTime.ofInstant(startTime, zone);

            if (inboundMessage != null) {
                String inboundMessageStr = jmsMessageFormat.format(inboundMessage);
                log.error(""
                        + "Stopping listening '{}' duo message processing "
                        + "error until {}. Input message: {}.",
                    getDestinationName(mlc), startTimeLdt, inboundMessageStr,
                    throwable
                );

            } else {
                log.error(
                    "Stopping listening '{}' duo queue access error until {}.",
                    getDestinationName(mlc), startTimeLdt,
                    throwable
                );
            }
        }

        private @NonNull
        String getDestinationName(@NonNull DefaultMessageListenerContainer mlc) {
            try {
                String result = mlc.getDestinationName();
                if (result != null) {
                    return result;
                }

                Destination destination = mlc.getDestination();
                if (destination instanceof Queue) {
                    return ((Queue) destination).getQueueName();
                }
                if (destination != null) {
                    return destination.toString();
                }

            } catch (Exception e) {
                log.error("Destination name extraction error", e);
            }

            return "Unknown destination";
        }
    }
}

Инициализация выглядит следующим образом

    @Bean
    public PauseOnErrorJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {

        PauseOnErrorJmsListenerContainerFactory factory
            = new PauseOnErrorJmsListenerContainerFactory();
        factory.setAfterErrorSleepDurationMinutes(afterErrorDurationMinutes);
        factory.setConnectionFactory(connectionFactory);
        factory.setTransactionManager(transactionManager);
        return factory;
    }
(Просмотрено 302 раз, 1 раз за сегодня)
Вы можете оставить комментарий, или Трекбэк с вашего сайта.

Оставить комментарий