【问题标题】:Spring JMS listener for AWS SQS queue configuration with concurrency具有并发性的 AWS SQS 队列配置的 Spring JMS 侦听器
【发布时间】:2020-01-07 06:54:52
【问题描述】:

我正在尝试弄清楚如何配置 jms 侦听器以侦听 AWS 队列并在多个线程中处理消息(同时处理约 100 个线程)。

下面是我的配置。

@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(AmazonSQS amazonSQS) {
        ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(providerConfiguration, amazonSQS);
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("30-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
        factory.setErrorHandler(t -> {
        });
        return factory;
    }

}

使用此配置,我不断收到以下错误:

SQSMessageConsumer - 无法终止执行程序服务 ConsumerPrefetch 30秒后,一些正在运行的线程将立即关闭

此外,将消息发布到 AmazonSQS 实例需要 20 秒。

我尝试了 NumberOfMessagesToPrefetchCacheLevel 的不同组合,但没有任何效果。

例如CacheLevel = CACHE_CONSUMER 正常工作,但一次处理 1 条消息。

请帮我配置一下。 谢谢!

图书馆:

  • aws-java-sdk:1.11.41

  • spring-jms:5.1.7

  • amazon-sqs-java-messaging-lib:1.0.6

【问题讨论】:

    标签: java spring jms amazon-sqs


    【解决方案1】:

    @Boris 我能够通过以下方式解决此问题。我没有设置缓存级别。提供程序配置仍在禁用预取

    ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);

    另外,我使用DefaultJmsListenerContainerFactory,然后设置以下值,除了未显示的其他值:


    DefaultJmsListenerContainerFactory

    ​​>

    factory.setConcurrency("5-10"); //This will depend on your use case factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); //Used to in-flight erroneous messages factory.setTaskExecutor(threadPoolTaskExecutor()); //Uses threadpooltask executor, see below factory.setMaxMessagesPerTask(10); //Not required, but to reduce thread-switching. This will depend on your use case


    在使用 ThreadPool Executor 之前请阅读ThreadPool Executor,它写于大约 12 年前,但仍然有效。我没有在@JmsListener 级别设置并发,因为它会导致额外的复杂性JmsListener Concurrency

    【讨论】:

      猜你喜欢
      • 2015-12-05
      • 2019-10-07
      • 2013-06-16
      • 1970-01-01
      • 2012-08-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-31
      相关资源
      最近更新 更多