【问题标题】:SimpleMessageListenerContainer - recover from failureSimpleMessageListenerContainer - 从失败中恢复
【发布时间】:2017-11-17 19:27:47
【问题描述】:

在任何可能的错误(缺少队列、连接问题等)上关闭 SimpleMessageListenerContainer(以编程方式创建,而不是作为 bean)并创建新的(在重新声明运行时间。

我使用 helix 进行分区管理,每个分区有 1 个侦听器。一种可能性是也使用现有的 SimpleMessageListenerContainer(不总是创建新的),但在这种情况下,我需要重试队列重新声明和重新绑定以防任何失败。

此外,似乎有不同类型的异常——致命的(例如在运行时删除队列)和非致命的(连接丢失)。如何同时处理这两种情况?

这两者中哪个更容易选择?

更新

private Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent listenerContainerConsumerFailedEvent) {

    boolean fatal = listenerContainerConsumerFailedEvent.isFatal();
    SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)listenerContainerConsumerFailedEvent.getSource();

    if(fatal){
        AtomicBoolean sd = shuttingDown
                .computeIfAbsent(listenerContainer, v -> new AtomicBoolean(false));
        if(sd.compareAndSet(false, true)) {
            System.out.println("RECREATING");
            String[] qn = listenerContainer.getQueueNames();
            String q = qn[0];
            recreateQueue(q);
            listenerContainer.stop();
            listenerContainer.start();
            //delete from shuttingDown ?
        }
        else{
            System.out.println("RECREATING_NOT");
        }
    }
    else{
        System.out.println("NON_FATAL");
    }
}

和输出

NON_FATAL
NON_FATAL
NON_FATAL
NON_FATAL
22:36:44.044 [SimpleAsyncTaskExecutor-7] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer received fatal=false...\
...

RECREATING
RECREATING_NOT
RECREATING_NOT
RECREATING_NOT
22:36:44.057 [SimpleAsyncTaskExecutor-6] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Stopping container from aborted consumer

【问题讨论】:

  • 在失败的情况下重新创建 SimpleMessageListenerContainer bean 的原因是什么? Spring AMQP 不包含恢复/重新连接功能吗?
  • @KrzysztofTomaszewski 问题与关闭容器并在稍后(不是立即)启动它有关。
  • 这对我来说很清楚。我很好奇这是否是在遇到 RabbitMQ 代理失败后通常需要的东西?
  • @KrzysztofTomaszewski 我需要这个来更改分区 - 当其他组件接管分区时关闭简单的消息侦听器。如果出现故障 - spring 将自动重新连接。
  • 好的。我只是有一个案例,SimpleMesssageListenerContainer 以某种方式默默地(没有错误日志消息)与 RabbitMQ 代理断开连接,并且它没有接收到队列中存在的消息。还在挖……

标签: java rabbitmq spring-integration spring-amqp helix


【解决方案1】:

在容器中添加ApplicationEventPublisherListenerContainerConsumerFailedEvents 有一个 fatal 布尔属性。

编辑

@SpringBootApplication
public class So47357940Application {

    public static void main(String[] args) {
        SpringApplication.run(So47357940Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(AmqpAdmin admin) {
        return args -> admin.deleteQueue("so47357940");
    }

    @RabbitListener(queues = "so47357940")
    public void listen(String in) {
        System.out.println(in);
    }

    private final Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

    @Bean
    public ApplicationListener<ListenerContainerConsumerFailedEvent> failures(AmqpAdmin admin,
            RabbitTemplate template) {
        return event -> {
            if (event.isFatal()) {
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
                AtomicBoolean sd = this.shuttingDown.computeIfAbsent(container, v -> new AtomicBoolean());
                if (sd.compareAndSet(false, true)) {
                    System.out.println("RECREATING");
                    String[] qn = container.getQueueNames();
                    String q = qn[0];
                    admin.declareQueue(new Queue(q));
                    // better to use a shared exec
                    ExecutorService exec = Executors.newSingleThreadExecutor();
                    exec.execute(() -> {
                        while (container.isRunning()) {
                            // should probably give up at some point
                            try {
                                Thread.sleep(100);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        container.start();
                        template.convertAndSend("so47357940", "foo");
                        this.shuttingDown.remove(container);
                    });
                }
                else {
                    System.out.println("RECREATING_NOT");
                }
            }
            else {
                System.out.println("NON_FATAL");
            }
        };
    }

}

这是我得到的调试日志...

RECREATING
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$144/1094003461 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'so47357940'
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@3a813488: tags=[Cancelling Consumer@3a813488: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2017-11-17 17:38:53.903 ERROR 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2017-11-17 17:38:53.903 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2017-11-17 17:38:54.003 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
2017-11-17 17:38:54.004 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3a2547b8: tags=[Starting consumer Consumer@3a2547b8: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0], channel=null, acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'so47357940' with tag amq.ctag-3wMG_13-68ibLL05ir3ySA: Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [ool-1-thread-11] o.s.a.r.listener.BlockingQueueConsumer   : ConsumeOK : Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$146/1108520685 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'foo' MessageProperties [headers=Publishing message (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940], contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940]
2017-11-17 17:38:54.012 DEBUG 42372 --- [ool-1-thread-12] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.012 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'foo' MessageProperties [headers=Received message: (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940]), contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940])
2017-11-17 17:38:54.015 DEBUG 42372 --- [cTaskExecutor-3] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=so47357940, amqp_contentEncoding=UTF-8, amqp_deliveryTag=1, amqp_consumerQueue=so47357940, amqp_redelivered=false, id=b614d9e6-1744-b600-7d86-ca9c51ad5844, amqp_consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, contentType=text/plain, timestamp=1510958334014}]]
foo

【讨论】:

  • 感谢您的及时回复。我已经尝试过这种方法,但是多次调用此方法时遇到问题 - 失败时至少有 6 次调用。在这种情况下,仅重新创建绑定并在同一个 ListenerContainer 上调用 start() 以使其再次工作就足够了吗?
  • 是的,如果致命故障已解决,您可以重新启动失败的容器。
  • 您能否检查上面有问题的更新代码 - 似乎我仍然只得到“停止中止消费者的容器”并且没有侦听器,并且实际上重新创建了队列。
  • 你不能在监听线程上做停止/启动;你也不需要stop();最好等到容器自行停止。查看我的编辑。
  • 我猜这是一个竞争条件,start() 没有做任何事情,因为侦听器线程仍然处于活动状态。如果您尝试在陈旧的侦听器线程AMQP-785 上尝试start() 容器,我们可能应该看一下它,看看我们是否至少可以记录一个错误。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-06-05
  • 2014-05-22
  • 1970-01-01
  • 1970-01-01
  • 2011-09-23
  • 2017-12-26
相关资源
最近更新 更多