【问题标题】:How to send NACK to Solace queue from Solace listener using JMS API?如何使用 JMS API 从 Solace 侦听器向 Solace 队列发送 NACK?
【发布时间】:2021-11-20 12:08:18
【问题描述】:

需要您的帮助才能找到解决方案。当前实施细节:

SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge

使用 MessageListener 监听队列

public class MyListener implements MessageListener 

    public void onMessage(Message message) // receive the message and process
    { 
        /* in this method validating the message (poison message) 
         * and publish to kafka, once receive the success message
         * from kafka acknowledge the message
         */
        message.acknowledge();
    }

问题:如果 Kafka 代理宕机了如何重试?我们与内部 Solace 团队进行了讨论。

我们假设如果客户端没有调用message.acknowledge()Solace 将进行重试,但内部 Solace 团队澄清它正在使用窗口机制,所以如果后续消息是确认,那么之前的消息也会确认并删除。

他们的建议是发回失败消息的 NACK(否定确认),然后 Solace 可以重试该消息。如何使用 Java API 发送 NACK?

Max_Un_Ack_MessageMax_Redeliver_Count 的建议值是多少?

Solace 的 Maven 依赖项:

<dependency>
    <groupId>com.solacesystems</groupId>
    <artifactId>sol-jms</artifactId>
    <version>10.0.0</version>
</dependency>

【问题讨论】:

    标签: java jms solace retry-logic


    【解决方案1】:

    如果您使用 JMS API 并且想要触发重新投递,那么您有几个选择。

    您可以使用transacted session。当处理消息成功时,您确认消息和commit()javax.jms.Session。当处理不成功时,您在javax.jms.Session 上致电rollback()

    或者,您可以使用javax.jms.Session.recover()。 JavaDoc 是这样说的:

    在此会话中停止消息传递,并使用最旧的未确认消息重新开始消息传递。

    所有消费者都按顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。

    重新启动会话会导致它执行以下操作:

    • 停止消息传递
    • 将所有可能已送达但未确认的邮件标记为“已重新送达”
    • 重新启动传递序列,包括以前传递的所有未确认消息。重新递送的邮件不必完全按照其原始递送顺序递送。

    当然,这假设 Solace JMS 客户端实际上实现了此处的指定行为。

    【讨论】:

    • 感谢您的建议。我正在使用 connection.session.recover(); --> 用于安慰消费者的下游故障它不断重试无法处理的消息。这个rec​​over() 方法适合用于生产场景吗?(运行多个消费者实例)使用这个方法有什么瓶颈吗?重试消息不具有可声明为重试消息的属性。(重试计数或重新传递计数)。
    【解决方案2】:

    根据here 的讨论,message.acknowledge() 与窗口无关。由于您在示例中使用了异步解决方案,如果您不调用 stop(),则传输窗口将关闭并且消息将不会被确认。

    message.acknowledge() 仅用于确认消息已被接收和消费,其中消息已从 Solace 代理的持久存储中删除。请注意,未确认的消息只会在下一个要连接的消费者时重新传递。

    【讨论】:

      猜你喜欢
      • 2015-08-25
      • 1970-01-01
      • 2021-03-29
      • 2019-04-17
      • 2020-05-27
      • 1970-01-01
      • 2017-03-16
      • 2018-04-07
      • 2016-05-20
      相关资源
      最近更新 更多