【问题标题】:Consuming messages in parallel from ActiveMQ从 ActiveMQ 并行消费消息
【发布时间】:2020-01-17 07:59:19
【问题描述】:

每当我第一次将消息发布到队列时,消息被毫无问题地提取,但是当我删除第二个文件时,消息处于“待处理”状态,线程休眠时间(2 分钟)。为了测试在 ActiveMQ 中工作的并发性,我添加了名为 ThreadService 的 bean。

我在 JMSConfig.java 中有如下代码

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL("tcp://localhost:61616");
    connectionFactory.setPassword("admin");
    connectionFactory.setUserName("admin");
    connectionFactory.setTrustedPackages(Arrays.asList("com.jms.domain", "java.util"));
    connectionFactory.setMaxThreadPoolSize(1);
    return connectionFactory;
}

@Bean(destroyMethod = "stop", initMethod = "start")
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory connectionFactory) {
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections("8");
    pooledConnectionFactory.setMaximumActiveSessionPerConnection("10");
    return pooledConnectionFactory;
}

@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-5");
    return factory;
}

CamelRouter.java

from("file://E:/Camel")
    .bean(ThreadService)        
    .to("activemq:MessageQueue");

ThreadService.java

public void ThreadService throws Exception {
    Thread.sleep(120000);
}

如何在 ActiveMQ 中实现并发,使处于待处理状态的消息并行出列?

【问题讨论】:

    标签: java concurrency apache-camel jms activemq


    【解决方案1】:

    我很困惑,因为您的问题主题是关于消费,而您的路线正在生产到 ActiveMQ

    并行消费者

    如果您想并行消费来自 JMS 队列,您通常配置多个消费者。

    如果您想为个人消费者执行此操作,您可以将其附加到端点 URI

    from("activemq:queue:myQueue?concurrentConsumers=5"
    

    如果您想将此作为每个消费者的默认设置,您可以在您的 bean 设置中进行配置

    @Bean
    public JmsConfiguration jmsConfiguration() {
        JmsConfiguration jmsConfiguration = new JmsConfiguration();
        jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
        jmsConfiguration.setConcurrentConsumers(5);
        return jmsConfiguration;
    }
    
    @Bean(name = "activemq")
    public ActiveMQComponent activeMq() {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setConfiguration(jmsConfiguration()); 
        return activeMQComponent;
    }
    

    并行生产者

    嗯,您的 JMS 生产路线有一个文件使用者,它是按定义为单线程的,以避免处理具有多个使用者的同一个文件。

    但是,您可以在使用 Threads DSL of Camel 文件消费后将您的路由转为多线程

    from("file://E:/Camel")
        .threads(5) // continue asynchronous from here with 5 threads
        .bean(ThreadService)        
        .to("activemq:MessageQueue");
    

    像这样,ThreadService 中的“长时间运行的任务”不应再阻塞其他文件,因为 threads 语句中的 路由继续与 5 个线程异步。文件使用者保持单线程。

    但请注意! threads 语句中断当前事务。文件使用者将消息交给一个新线程。如果稍后发生错误,文件使用者看不到它。

    【讨论】:

    • 正如我上面提到的,camel 的代码是从我的本地文件夹路径中获取文件并检查并发性,我正在执行 Thread.sleep() 2 分钟,MessageQueue 等待 2从 Pending Messages 转移到 Dequeued 的分钟数...您能为这种情况提出一些建议吗,因为 ActiveMQ 是这里的消费者...
    • 你的意思是你想与ActiveMQ并行生产? ActiveMQ 是代理。您在其他地方有消费者吗?
    • Spring JMS 监听器是这里的消费者。 camel 是发布它使用文件组件获取的文件,Active MQ 是消息存储。如果我删除两个大文件,我想并行处理它们。有什么办法可以做到这一点?
    • 我已经扩展了我的答案,因为我想我现在更好地理解了你的问题
    • 谢谢 burki,但对我来说真正的问题不在于当我删除 2 个文件时,骆驼层本身并不存在第一个文件在收到文件后立即被消耗,但第二个文件需要 2 分钟才能接收因为我有 Thread.sleep() 2 分钟。因此,我不确定如何使上述工作并行工作,而不是等待当前按顺序方式处理的消息。添加了一张图片以显示在等待期间看到的内容。
    猜你喜欢
    • 2018-04-21
    • 2015-07-16
    • 2015-11-25
    • 1970-01-01
    • 2021-03-06
    • 2021-04-02
    • 2018-12-12
    • 1970-01-01
    • 2015-02-02
    相关资源
    最近更新 更多