【问题标题】:Messages marked as Deferred when consuming Azure Service Bus messages with Spring JMS and qpid使用 Spring JMS 和 qpid 使用 Azure 服务总线消息时标记为延迟的消息
【发布时间】:2017-01-03 15:44:45
【问题描述】:

我可以很好地接收来自我的服务总线订阅的消息,但是当我的侦听器中发生异常时,似乎最终会向服务总线发送一个 state=Modified 和 undeliverableHere=true 的 Disposition 框架。服务总线的docs 表示它不支持 amqp 修改配置。

消息在服务总线中以延迟状态结束,我不知道如何将消息轻推回活动状态。

JMS 配置:

@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
    JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
    connectionFactory.setClientID(clientId);
    connectionFactory.setUsername(properties.getUsername());
    connectionFactory.setPassword(properties.getPassword());
    connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
    return new CachingConnectionFactory(connectionFactory);
}

@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
    JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
    policy.setMaxRedeliveries(50);
    return policy;
}

@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setSubscriptionDurable(true);
    factory.setPubSubDomain(true);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

听众:

@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
    logger.debug("Got payload: " + dto);
    throw new RuntimeException("Oops");
}

这是我在日志中看到的:

消息传输开始

[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........

抛出异常,然后 JMS 在本地重新传递消息 50 次(为什么要这样做?)

之后我看到的下一个 amqp 协议帧是

[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}

在我看来,最后一个 Disposition 帧导致消息进入 Deferred 状态。消息上还有一个锁定令牌。即使 TTL 通过了,消息仍然停留在订阅中,并且通过 REST api 对它进行任何处理都无济于事。我已经尝试解锁它(使用 PUT)并删除它(使用 DELETE)。我也试过用 REST api 接收它(PeekLock 和接收和删除品种),看起来它们不存在。我在订阅中设置了选项以在过期后自动将消息移动到死信队列,并且它们永远不会移动。

来自 qpid-jms 的使 ack 发生的代码是 here,看来库的这一部分并不打算扩展,否则我会制作自己的实现来返回不同的 ack。

如何获取 qpid/JMS 到

  1. 使用 FAILED 确认而不是 MODIFIED_FAILED_UNDELIVERABLE 确认
  2. 清除那些延迟消息。

【问题讨论】:

    标签: azureservicebus spring-jms qpid


    【解决方案1】:

    首先,JMS 规范正确地指出,从 MessageListener 回调中抛出异常实际上是一个应用程序编程错误,您的应用程序应该自己处理这些错误。

    其次,客户端使用正确的配置来指示消息无法传递给该客户端,遥控器应该支持 AMQP 1.0 规范中列出的所有配置,我会联系微软并要求他们实施规范接近一点了。

    要解决上述问题,您可以在客户端确认模式下接收消息,当您拦截正在引发的异常时,您可以使用this JIRA Issue 中描述的机制配置消息确认。

    代码如下所示:

    message.setIntProperty("JMS_AMQP_ACK_TYPE", 2);
    message.acknowledge();
    

    模式定义为:

    ACCEPTED = 1;
    REJECTED = 2;
    RELEASED = 3;
    MODIFIED_FAILED = 4;
    MODIFIED_FAILED_UNDELIVERABLE = 5;
    

    【讨论】:

    • 谢谢!我会直接告诉微软让他们一起行动;)
    猜你喜欢
    • 1970-01-01
    • 2017-03-09
    • 1970-01-01
    • 1970-01-01
    • 2020-06-11
    • 2014-09-27
    • 2022-08-10
    • 1970-01-01
    • 2018-01-19
    相关资源
    最近更新 更多