【问题标题】:Multithreaded JMS code : CLIENT_ACKNOWLEDGE or transacted session多线程 JMS 代码:CLIENT_ACKNOWLEDGE 或事务处理会话
【发布时间】:2011-04-19 23:41:06
【问题描述】:

已编辑问题:我正在开发多线程 JMS 接收器和发布器代码(独立的多线程 Java 应用程序)。 MOM 是 MQSonic。 从队列接收 XML 消息,调用存储过程(执行需要 70 秒)并在 90 秒内将响应发送到主题。 当代理关闭或应用程序处于预定关闭状态时,我需要处理一个条件。即从 Queue 接收消息并在 java 中处理消息的情况,同时 Queue 和 Topic 都将关闭。然后要处理那些不在队列中且未发送到主题但在 java 内存中的消息,我有以下选项:

(1) 将 CLIENT_ACKNOWLEDGE 会话创建为: 连接.createSession(假,javax.jms.Session.CLIENT_ACKNOWLEDGE) 这里我只会在事务(存储过程)成功完成后才确认消息

(2) 使用事务处理会话,即 connection.createSession(true, -1)。在这种方法中,由于事务(存储过程)中的一些异常,消息被回滚并重新传递。他们一次又一次地回滚并继续,直到我终止程序。我可以限制从队列中重新传递 jms 消息的次数吗?

另外在上述两种方法中,哪一种比较好?

【问题讨论】:

  • 哇,您真的可能想考虑编辑这个问题以使其更清晰......或者至少使用可理解的语法。我看到这篇文章中嵌入了很多熟悉的概念,但我仍然没有“大局”了解您的要求。
  • 如果我理解正确 - 问题是,当队列不可用时,如何避免发布者代码无休止地尝试将消息重新传递到队列?
  • 问题已编辑,如有任何混淆,我们深表歉意。
  • 根据编辑,我删除了我的回复。 Crowne 关于检查消息的回退计数的答案是我通常建议的。在 WMQ 中有一个队列配置可以为您执行此操作并且不需要任何代码。我不知道索尼克是否有类似的功能。
  • Sonic 确实有这个功能,它是 connectionfactory 的一个属性。我已经在下面的回答中详细说明了。

标签: java transactions jms messaging


【解决方案1】:

不知道您使用的是哪个消息传递提供商,我不知道这是否会对您有所帮助。

MQ 系列消息有一个回退计数器,可以通过在队列上配置强化回退计数器选项来启用它。
当我之前遇到这个问题时,我会这样做:

// get/receive message from queue

if ( backout counter > n ) {
   move_message_to_app_dead_letter_queue();
   return;
}
process_message();

MQ 系列标头字段可作为 JMS 属性访问。

如果您可以使用 XA 事务同时回滚或提交数据库和队列管理器,那么使用上述方法也会有所帮助。
然而,XA 事务确实会导致显着的性能损失,并且对于存储过程,这可能是不可能的。

另一种方法是将消息作为 blob 立即写入 message_table,然后从队列中提交消息。
在message_table上放置一个触发器来调用存储过程,然后将JMS响应机制添加到存储过程中。

【讨论】:

  • 非常感谢您的第一个建议!我尝试了第二种替代方法,但后端人员在触发器中有一些问题(数据库/应用程序级别)。所以我只在 java 代码中使用事务。
【解决方案2】:

如果您担心与可能已关闭的消息队列/代理/服务器/等通信,以及这会如何中断您尝试设计的更大进程的整体流程,那么您可能应该研究 JMS 队列支持服务器集群,因此当集群中的各个服务器出现故障时,您仍然可以可靠地生成/使用消息。

【讨论】:

    【解决方案3】:

    您的问题不是 100% 清楚,但问题似乎是您在处理消息时抛出了异常,而您确实不应该这样做。

    如果消息存在实际问题,例如 xml 格式错误或根据您的数据模型无效,您不想回滚事务。您可能想要记录错误,但您已经成功处理了该消息,在这种情况下,“成功”意味着您已将消息识别为有问题。

    另一方面,如果在处理消息时出现问题是由消息外部因素引起的(例如,数据库已关闭,或者目标主题不可用),您可能确实希望回滚事务,但是,您还需要确保在问题得到解决之前停止使用消息,否则您最终会遇到您描述的场景,您不断地反复处理相同的消息,并且每次尝试访问任何资源时都会失败目前不可用。

    【讨论】:

      【解决方案4】:

      接口progress.message.jclient.ConnectionFactory 有一个方法setMaxDeliveryCount(java.lang.Integer value),您可以在其中设置将消息重新传递到您的MessageConsumer 的最大次数。当这个次数达到时,它会被移动到SonicMQ.deadMessage队列中。

      您可以在“Sonic MQ 应用程序编程指南”一书第 210 页(版本 7.6)中查看这一点。

      至于你关于哪个更好的问题......这取决于存储过程是否会被多次执行。如果这是一个问题,您应该使用跨越 JMS 队列和数据库的事务(Sonic 支持 XA 事务)。如果您不介意执行多次,那么当您注意到代理已关闭时(很可能是当您尝试确认消息时),我会选择不确认消息并中止处理。这样,如果第一个处理器在连接失败后无法处理该消息,则另一个处理器能够处理该消息。

      如果消息的处理时间不定,您可能还需要查看 Sonic JMS 会话的SINGLE_MESSAGE_ACKNOWLEDGE 模式。通常,在消息上调用acknowledge() 也会确认它之前的所有消息。如果您正在无序地处理它们,那不是您想要发生的。在单消息确认模式(不在 JMS 标准中)中,acknowledge() 仅确认调用它的消息。

      【讨论】:

        猜你喜欢
        • 2012-06-12
        • 2016-03-04
        • 1970-01-01
        • 2015-01-08
        • 2018-07-15
        • 2011-03-24
        • 2015-07-23
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多