【问题标题】:What's the difference between SimpleMessageListenerContainer and DirectMessageListenerContainer in Spring AMQP?Spring AMQP 中的 SimpleMessageListenerContainer 和 DirectMessageListenerContainer 有什么区别?
【发布时间】:2019-10-19 16:26:25
【问题描述】:

Spring AMQP 中的SimpleMessageListenerContainerDirectMessageListenerContainer 有什么区别?我检查了他们的两个文档页面,SimpleMessageListenerContainer 几乎没有关于内部工作的解释,DirectMessageListenerContainer 有以下解释:

SimpleMessageListenerContainer 并不是那么简单。最近对 rabbitmq java 客户端的更改促进了一个更简单的侦听器容器,它直接在 rabbit 客户端消费者线程上调用侦听器。没有 txSize 属性 - 每条消息都是单独确认(或确认)的。

我真的不明白这些是什么意思。它说listener container that invokes the listener directly on the rabbit client consumer thread。如果是,那么SimpleMessageListenerContainer 是如何调用的?

我写了一个小应用程序并使用了DirectMessageListenerContainer,只是为了看看区别,我切换到SimpleMessageListenerContainer,但据我所知,RabbitMQ 端没有区别。从 Java 方面来看,区别在于方法(SimpleMessageListenerContainer 提供了更多)和日志(DirectMessageListenerContainer 记录了更多内容)

我想知道每个场景的使用情况。

【问题讨论】:

    标签: java rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    SMLC 为每个消费者(并发)提供一个专用线程,用于轮询内部队列。当一条新消息到达客户端线程上的消费者时,它被放入内部队列,消费者线程将其拾取并调用侦听器。这是早期版本的客户端提供多线程所必需的。对于较新的客户端,这不是问题,因此我们可以直接调用侦听器(因此得名)。

    除了txSize之外还有一些其他区别。

    Choosing a Container

    【讨论】:

      【解决方案2】:

      在 DirectMessageListenerContainer 中,一些逻辑被移到 AMQP 实现中,而不是像 SimpleMessageListenerContainer 那样的 ListenerContainer

      这就是 SimpleMessageListenerContainer 中的 Javadocs 对 setTxSize() 所说的 -

      /**
       * Tells the container how many messages to process in a single transaction (if the channel is transactional). For
       * best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}. Also affects
       * how often acks are sent when using {@link AcknowledgeMode#AUTO} - one ack per txSize. Default is 1.
       * @param txSize the transaction size
       */
      

      客户端在每次处理 txSize 个消息时发送一个 ack。这是在方法中控制的

      private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
      
          Channel channel = consumer.getChannel();
      
          for (int i = 0; i < this.txSize; i++) {
      
              logger.trace("Waiting for message from consumer.");
              Message message = consumer.nextMessage(this.receiveTimeout);
              .
              .
      

      在较新的实现中,每条消息都直接在线程上确认并基于事务模型(Single 或 publisher confirms)消费者向 Rabbit MQ 发送确认

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-06-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-05-07
        • 2013-03-05
        • 1970-01-01
        相关资源
        最近更新 更多