【问题标题】:Spring and Rabbit mq message orderSpring 和 Rabbit mq 消息顺序
【发布时间】:2018-03-02 14:21:03
【问题描述】:

我正在开发一个应用程序,它接收按会话 ID 分组的剩余消息(会话 1 可以由 2 条消息组成,会话 2 可以由 10 条消息组成)并将它们发送到数据库。给定会话的消息内部具有相同的会话 ID。

对于给定的会话,第一条消息应该首先发送到数据库,然后是第二条,依此类推。在会话中顺序非常重要。

会话的顺序并不重要,我们可以混合它们的信息,例如我们可以按此顺序将消息发送到数据库:

  • 会话 A 消息 1
  • 会话 B 消息 1
  • 会话 A 消息 2
  • 会话 C 消息 1
  • 会话 B 消息 2
  • 会话 A 消息 3
  • 会话 C 消息 2

我创建了 10 个 rabbitmq 队列。应用程序根据会话 ID 选择队列:来自给定会话的所有消息都在同一个队列中。

每个队列有1个消费者,所以在同一个队列中的顺序是有保证的。

出于性能原因(以及流量增长),我们必须将队列数量设置得更高(节点创建 100 个队列)或部署应用程序的其他实例(10 个节点,每个队列上有 1 个消费者 - 所以 10 个消费者每个队列)。

将队列数设置得更高并不难,但我这样做的方式有点难看并且有代码重复(见下文)。我需要建议以使其变得更好(而且我们需要 1000 个队列)。

如果我们部署 10 个节点而不是 1 个,则每个队列将有 10 个消费者,并且队列中消息的顺序将无法保证(因此来自会话 A 的消息 2 可以在来自会话 A 的 msg 1 之前发送到数据库)。

首选的解决方案是 10 个节点,因为我们可以使其动态化,并且我们可以在需要时在 docker 中启动/停止节点。

这是我使用的依赖项:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
        <version>1.6.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.6.3.RELEASE</version>
    </dependency>

这是兔子的配置:

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setPrefetchCount(50);
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
    String addresses = "address1,address2";
    com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
    rabbitConnection.setAutomaticRecoveryEnabled(true);
    rabbitConnection.setUsername("username");
    rabbitConnection.setPassword("password");
    rabbitConnection.setVirtualHost("virtualHost");
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnection);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setAddresses(addresses);
    connectionFactory.setChannelCacheSize(100);
    return connectionFactory;
}

目前,我用 10 个类创建了 10 个队列。这是一个队列示例:

@Component
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "queue2", durable = "true"), exchange = @Exchange(type = "topic", value = "exchange2", durable = "true"), key = "key2"))
public class QueueGroup2Listener {
    @RabbitHandler
    public void processOrder(RequestMessage received) throws DataAccessResourceFailureException {
        process(received);
    }
}

我没有找到比在注释中使用不同值(从 1 到 10)创建 10 次此类更好的方法。

问题是: 如何在队列中添加消费者并保证给定会话中的消息顺序?我的意思是队列中有 10 个消费者。消费者 A 使用来自会话 A 的消息 1,因此其他消费者不应使用来自会话 A 的其他消息。

额外问题是: 如何使队列创建优于每个队列 1 个类?

非常感谢

更新

这个问题的答案可以帮助我很多 RabbitMQ : Create Dynamic queues in Direct Exchange:我可以为每个会话创建一个队列(在这种情况下,下一个问题是 rabbitmq 可以同时管理多少个队列?)

加里回答后更新

感谢您的回复,我尝试了以下方法,但应用程序启动消费者的时间很长:

@Bean
public QueueMessageListener listener() {
    return new QueueMessageListener();
}


@Bean(name="exchange")
public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    MessageListenerAdapter adapter = new MessageListenerAdapter(listener(), "processOrder");
    container.setMessageListener(adapter);
    admin().declareExchange(exchange);
    createQueues(exchange, QUEUE, numberOfQueues, BINDING_KEY, container, null, true);
    container.start();  // very very very long 
    return exchange;
}

private void createQueues(Exchange exchange, String queuePrefix, int numberOfQueues, String bindingPrefix,
        SimpleMessageListenerContainer container, Map<String, Object> args) {
    int length = 1;
    if(numberOfQueues > 1) {
        length = (int)(Math.log10(numberOfQueues - 1) + 1);
    }
    for (int i = 0; i < numberOfQueues; i++) {
        Queue queue = new Queue(queuePrefix + String.format("%0" + length + "d", i), true, false, false, args);
        container.addQueues(queue);
        admin().declareQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingPrefix + i).noargs();
        admin().declareBinding(binding);
    }
}

如果我不调用 start 函数,则不会创建消费者。

【问题讨论】:

    标签: java rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    您可以通过编程方式启动SimpleMessageListenerContainers,而不是使用声明性范例。

    您还可以使用RabbitAdmin 以编程方式声明队列、绑定等。

    Configuring the Broker

    由于 Spring AMQP 缓存通道,因此不能保证两个发送将发生在同一个通道上(这引入了非常小的顺序丢失的可能性);为确保顺序,您需要在即将发布的 2.0 版本中使用新的RabbitTemplate.invoke() 方法。它将在同一通道上的调用范围内执行发送,因此可以保证顺序。

    如果您的发送代码是单线程的,这不是问题,因为在这种情况下将始终使用相同的通道。

    【讨论】:

      猜你喜欢
      • 2018-07-30
      • 2023-01-31
      • 1970-01-01
      • 2020-06-24
      • 2020-03-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-11-01
      相关资源
      最近更新 更多