【问题标题】:Rabbit SimpleMessageListenerContainer won't shut down兔子 SimpleMessageListenerContainer 不会关闭
【发布时间】:2018-03-21 04:28:58
【问题描述】:

this question 开始,我们遇到了 Rabbit 凭据失效的情况,我们需要在 CachingConnectionFactory 上调用 resetConnection() 以获取一组新的凭据。

我们在ShutdownSignalException 处理程序中执行此操作,它基本上可以工作。不起作用的是我们还需要重新启动我们的侦听器。我们有一些这样的:

@RabbitListener(
    id = ABC,
    bindings = @QueueBinding(value = @Queue(value="myQ", durable="true"),
        exchange = @Exchange(value="myExchange", durable="true"),
        key = "myKey"),
    containerFactory = "customQueueContainerFactory"
)
public void process(...) {
   ...
}

this answer(也叫this)给人的印象是我们只需要做:

@Autowired RabbitListenerEndpointRegistry registry;
@Autowired CachingConnectionFactory connectionFactory;

@Override
public void shutdownCompleted(ShutdownSignalException cause) {
    refreshRabbitMQCredentials();
}

public void refreshRabbitMQCredentials() {
    registry.stop();  // do this first

    // Fetch credentials, update username/pass
    connectionFactory.resetConnection(); // then this

    registry.start();  // finally restart
}

问题是,在通过SimpleMessageListenerContainer 进行调试后,当这些容器中的第一个调用了doShutdown() 时,Spring 会尝试取消BlockingQueueConsumer

因为底层 Channel 仍然报告为打开 - 即使 RabbitMQ UI 没有报告任何连接或通道正在打开 - 一个 Cancel 事件被发送到 ChannelN.basicCancel() 内部的代理,但通道然后永远阻塞回复,结果容器关闭被完全阻止。

我尝试将TaskExecutorExecutors.newCachedThreadPool())注入容器并调用shutdownNow()或中断它们,但这些都不会影响通道的阻塞等待。

看来我解除通道阻塞的唯一选择是在取消期间触发额外的ShutdownSignalException,但是 (a) 我不知道该怎么做,而且 (b) 看起来我必须这样做在尝试再次关闭之前并行启动所有侦听器的取消)。

// com.rabbitmq.client.impl.ChannelN
@Override
public void basicCancel(final String consumerTag) throws IOException
{
    // [snip]

    rpc(new Basic.Cancel(consumerTag, false), k);

    try {
        k.getReply(); // <== BLOCKS HERE
    } catch(ShutdownSignalException ex) {
        throw wrap(ex);
    }
    metricsCollector.basicCancel(this, consumerTag);
}

我不确定为什么这会如此困难。有没有更简单的方法来强制SimpleMessageListenerContainer 关机?


使用 Spring Rabbit 1.7.6; AMQP 客户端 4.0.3; Spring Boot 1.5.10-RELEASE


更新

一些日志证明消息容器在连接刷新完成之前重新启动的理论,这可能是它们不重新连接的原因:

ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO  u.c.c.c.r.ReauthenticatingChannelListener - Channel shutdown: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO  u.c.c.c.r.ReauthenticatingChannelListener - Channel closed with reply code 403. Assuming credentials have been revoked and refreshing config server properties to get new credentials. Cause: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
WARN  u.c.c.c.r.ReauthenticatingChannelListener - Shutdown signalled: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO  u.c.c.c.r.RabbitMQReauthenticator - Refreshing Rabbit credentials for XXXXXXXX
INFO  o.s.c.c.c.ConfigServicePropertySourceLocator - Fetching config from server at: http://localhost:8888/configuration
INFO  u.c.c.c.r.ReauthenticatingChannelListener - Got ListenerContainerConsumerFailedEvent: Consumer raised exception, attempting restart
INFO  o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer@2db55dec: tags=[{amq.ctag-ebAfSnXLbw_W1hlZ5ag7sQ=consumer.myQ}], channel=Cached Rabbit Channel: AMQChannel(amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/,2), conn: Proxy@12de62aa Shared Rabbit Connection: SimpleConnection@56c95789 [delegate=amqp://cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4@127.0.0.1:5672/, localPort= 50052], acknowledgeMode=AUTO local queue size=0
INFO  o.s.c.c.c.ConfigServicePropertySourceLocator - Located environment: name=myApp, profiles=[default], label=null, version=null, state=null
INFO  com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown initiated...
INFO  com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown completed.
INFO  u.c.c.c.r.RabbitMQReauthenticator - Refreshed username: 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4' => 'cert-configserver-d7b54af2-0735-a9ed-7cc4-394803bf5e58'
INFO  u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset, proceeding...

更新 2:

这似乎是某种竞争条件。删除容器停止/启动后,如果我向SimpleMessageListenerContainer.restart() 添加一个仅线程断点以让resetConnection() 比赛过去,然后释放断点,那么我可以看到事情开始恢复:

16:18:47,208 INFO  u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset
// Get ready to release the SMLC.restart() breakpoint...
16:19:02,072 INFO  o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: rabbitmq.service.consul:5672
16:19:02,083 INFO  o.s.a.r.c.CachingConnectionFactory - Created new connection: connectionFactory#7489bca4:1/SimpleConnection@68546c13 [delegate=amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/, localPort= 33350]
16:19:02,086 INFO  o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue ...
16:19:02,095 DEBUG u.c.c.c.r.ReauthenticatingChannelListener - Active connection check succeeded for channel AMQChannel(amqp://cert-configserver-132a07c2-94f3-0099-4de1-f0b1a9875d5a@127.0.0.1:5672/,1)
16:19:02,120 INFO  o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue (springCloudBus...

在这种情况下,我现在必须弄清楚如何延迟容器重启直到刷新完成(即我的 ShutdownSignalException 处理程序完成),或者以某种方式阻止刷新......


更新 3:

我的整体问题(这是一个症状)已通过以下方式解决:https://stackoverflow.com/a/49392990/954442

【问题讨论】:

    标签: java rabbitmq spring-amqp


    【解决方案1】:

    完全不清楚为什么频道会报告为打开;这对我来说很好;删除用户foo后恢复...

    @SpringBootApplication
    public class So49323291Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So49323291Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, CachingConnectionFactory cf,
                RabbitTemplate template) {
            return args -> {
                cf.setUsername("foo");
                cf.setPassword("bar");
                registry.start();
                doSends(template);
                registry.stop();
                cf.resetConnection();
                cf.setUsername("baz");
                cf.setPassword("qux");
                registry.start();
                doSends(template);
            };
        }
    
        public void doSends(RabbitTemplate template) {
            while (true) {
                try {
                    template.convertAndSend("foo", "Hello");
                    Thread.sleep(5_000);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    
        @RabbitListener(queues = "foo", autoStartup = "false")
        public void in(Message in) {
            System.out.println(in);
        }
    
    }
    

    (Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo , deliveryTag=4, consumerTag=amq.ctag-9zt3wUGYSJmoON3zw03wUw, consumerQueue=foo])

    2018-03-16 11:24:01.451 错误 11867 --- [127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory:通道关闭:连接错误;协议方法:#method(reply-code=320, reply-text=CONNECTION_FORCED - 删除用户'foo', class-id=0, method-id=0)

    ...

    原因:com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录。有关详细信息,请参阅代理日志文件。

    2018-03-16 11:24:01.745 错误 11867 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer :从中止的消费者中停止容器

    2018-03-16 11:24:03.740 INFO 11867 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory:创建新连接:rabbitConnectionFactory#2c4d1ac:3/SimpleConnection@5e9c036b [delegate=amqp://baz@127.0. 0.1:5672/, localPort= 59346]

    (Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo , deliveryTag=1, consumerTag=amq.ctag-ljnY00TBuvy5cCAkpD3r4A, consumerQueue=foo])

    但是,您真的不需要停止/启动注册表,只需使用新凭据重新配置连接工厂并调用resetConnection();容器将恢复。

    【讨论】:

    • 谢谢。我希望我不必停止/启动,但容器不会以任何方式恢复。我想知道这是否可能是因为消费者在 CachingConnectionFactory 设置其凭据之前重新启动。此外,如果我将 Spring AMQP 日志记录降低到 DEBUG,它似乎仍然在获取旧的详细信息。那里似乎没有任何类型的“重置”。
    • 似乎是一个影响我但不影响你的竞争条件。
    • 我使用 2.0.2 进行了测试,但我不记得有任何会影响这一点的更改。下周我将尝试 1.7.x。如果将容器上的 possibleAuthenticationFailureFatal 设置为 false,您会看到不同的行为吗?
    • 感谢收看。经过测试,该标志在这里无效。它看起来确实像是容器和ShutdownSignalException 处理程序之间的错位:只有当我可以在线程之间共享某些状态时,我才能尝试在容器重新启动之前重新排序刷新...
    • 我刚用 1.7.6 测试没有问题;如果你能整理一个展示问题的小例子,我会找出问题所在。
    猜你喜欢
    • 1970-01-01
    • 2014-12-06
    • 1970-01-01
    • 2012-12-17
    • 1970-01-01
    • 2013-02-24
    • 2012-12-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多