【问题标题】:JMS 2.0 - MQ 9 - Topic Shared subscription doesn't workJMS 2.0 - MQ 9 - 主题共享订阅不起作用
【发布时间】:2019-02-15 02:31:42
【问题描述】:

我在开发订阅 MQ 主题(MQ 版本 9)的应用程序时遇到问题。

我需要进行共享主题连接,因为应用程序将在多个实例(集群)中运行。

规格和文档说: “客户端使用非持久共享订阅,该客户端需要能够在多个消费者之间共享从主题订阅接收消息的工作。因此,非持久共享订阅可能有多个消费者。来自订阅将仅交付给该订阅中的一个消费者。”

对我来说,所有使用相同订阅名称的客户端都在同一个“集群”中,一次只有一个客户端会收到一条消息。

在我的代码中,受article 的启发,当第二个客户端尝试创建共享订阅时,我遇到了异常。我真的不明白这是 MQ 客户端库实现中的错误还是我的代码中的错误。

这是我的示例代码:

    import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class TestGB2 {

    public static void main(final String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
        }

    }

    public static class MyThread implements Runnable {

        private final String topicString;
        private final String clientId;
        private final String subscriptionName;

        public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
            Thread.currentThread().setName(threadName);
            this.topicString = topicString;
            this.clientId = clientId;
            this.subscriptionName = subscriptionName;
        }

        @Override
        public void run() {

            try {
                System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
                MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
                cf.setHostName("xxxx");
                cf.setPort(1416);
                cf.setQueueManager("xxxx");
                cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
                cf.setChannel("xxx");
                cf.setClientID(clientId);

                Connection con = cf.createConnection();

                Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                con.start();
                Topic topic = session.createTopic(topicString);
                MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here

                System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
                Message msg = messageConsumer.receive();
                System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));

            }
            catch (Exception ex) {
                System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
                ex.printStackTrace();
            }

        }

    }
}

下面的代码尝试创建 10 个线程来消费同一主题的消息。只有第一个线程能够连接,所有其他线程都失败并出现以下异常:

    com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
    at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
    at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
    at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
    at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
    at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
    at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
    at TestGB.lambda$0(TestGB.java:33)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
    ... 11 more

尝试使用最后一个库:

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.1.1.0</version>
</dependency>

【问题讨论】:

  • 嗨,谢谢,我尝试了以下情况:1)所有客户端的唯一客户端 ID + 相同的订阅名称:不起作用,2)唯一的客户端 ID + 随机子名称:有效但不是我想要的(所有客户端都会收到消息) 3)随机客户端 ID + 唯一订阅名称;不起作用,(所有客户端都收到消息)
  • 我会尝试查找最新的库
  • 我已经尝试使用最后一个 com.ibm.mq.allclient 库,仍然是同样的错误:(
  • 我已经改变了:查看我更新的代码。正如你所说,我正在使用相同的客户端 ID 和相同的订阅名称,我希望订阅由所有客户端共享,但它仍然不起作用。
  • 现在我使用的是 com.ibm.mq.allclient v 9.1.1.0

标签: java jms ibm-mq subscription shared


【解决方案1】:

问题总结

问题不在于您的程序,问题在于与您订阅的主题相关联的模型队列。

在队列管理器上,如果您查看订阅将匹配的主题对象,它将有一个指向模型队列的参数 MNDURMDL

如果您查看模型队列,您会注意到两个参数,其中一个或两个都可能导致您收到的错误:

[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]

这些必须设置为DEFSOPT(SHARED)SHARE。如果其中一个设置为另一个值,您将只能在共享订阅中拥有一个订阅者。


问题原因的其他详细信息

使用 IBM MQ Pub/Sub,当您创建 JMS 订阅时,MQ 将其​​视为托管订阅,IBM MQ 将在后台创建一个临时队列来订阅主题字符串。如果是非持久订阅,则队列是临时动态队列。

失败的原因是第一个线程将以独占模式打开临时动态队列,任何其他线程都无法打开临时动态队列并收到MQRC_OBJECT_IN_USE错误。


创建应用程序特定MNDURMDL 模型队列的可能原因

我怀疑这是因为 IBM 带有几个不同的默认模型队列。

非持久订阅者的默认设置具有以下设置:

   QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE)      TYPE(QMODEL)
   DEFSOPT(SHARED)                         SHARE

还有另一个不是发布/订阅特定的默认队列具有这些设置:

   QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE)       TYPE(QMODEL)
   DEFSOPT(EXCL)                           NOSHARE

您的主题对象创建的模型队列很可能是使用如下命令创建的,该命令默认使用SYSTEM.DEFAULT.MODEL.QUEUE 的设置。:

DEFINE QMODEL(xxx)

将来您可以专门设置这两个参数,或者使用LIKE 关键字定义它以强制它使用不同的队列来建模设置,两个命令如下:

DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)

有关创建和使用特定于应用程序的 TOPIC 对象和模型队列的其他详细信息

默认情况下,树的根节点由名为SYSTEM.BASE.TOPIC的标准TOPIC对象表示,与此TOPIC关联的默认模型队列如下所示:

   TOPIC(SYSTEM.BASE.TOPIC)                TYPE(LOCAL)
   TOPICSTR()                              MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
   MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)

如果您没有定义任何进一步的管理 TOPIC 对象,则所有主题都与 SYSTEM.BASE.TOPIC 匹配。此外,如果您没有定义任何进一步的管理 TOPIC 对象,并且您想授予应用程序对主题树的特定子集的权限(例如以 TESTSUB 开头的主题字符串),您必须通过 SYSTEM.BASE.TOPIC 授予权限,这在turn 授予应用程序访问任意主题字符串的权限,没有任何限制。

最佳实践是创建一个主题字符串的主题对象,该主题字符串与应用程序应有权访问的主题树部分相匹配。具体到您的 TESTSUB/# 示例,如果您的管理员定义了一个新的 TOPIC 对象并指定了 TOPICSTR(TESTSUB),则默认值会像这样创建它:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL( )
   MNDURMDL( )

空白的MDURMDLMNDURMDL 值告诉MQ 使用树中下一个最接近的更高主题对象的值,如果没有定义其他内容,这将是SYSTEM.BASE.TOPIC,模型队列仍将默认为使用SYSTEM.DURABLE.MODEL.QUEUESYSTEM.NDURABLE.MODEL.QUEUE 模型队列。

管理员可以改为创建 TOPIC 对象并指定不同的模型队列,例如:

   TOPIC(TESTSUB.TOPIC)                    TYPE(LOCAL)
   TOPICSTR(TESTSUB)                       MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
   MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)

通过这样做,他们可以定义特定于应用程序的模型队列,这些队列具有共享订阅所需的设置,并且不会影响 SYSTEM 模型队列。另一个好处是它们可以只为以TESTSUB 开头的主题字符串提供应用程序权限,例如TESTSUB/ATESTSUB/BTESTSUB/X/Y/Z

【讨论】:

  • 非常感谢,我认为您是对的。我检查了 SYSTEM.NDURABLE.MODEL.QUEUE,它被配置为不可共享。我们最近升级了MQ,我觉得这个参数变了。我会和管理员核实
  • 请注意,对于非持久订阅,它是SYSTEM.NDURABLE.MODEL.QUEUE,但对于持久订阅,它也是SYSTEM.DURABLE.MODEL.QUEUE,并且还应该使用DEFSOPT(SHARED) SHARE 进行设置。
  • 我和管理员核实过,在升级过程中,之前版本的一些默认值在迁移过程中丢失了。我们添加了 DEFSOPT(SHARED) 和 SHARE,它的工作就像一个魅力 :) 非常感谢
  • 我的管理员不想修改默认值,我去看看java端解决方案
  • @DanielSteinmann 用于创建 JMS 订阅者使用的 SYSTEM.MANAGED.[N]DURABLE.* 队列的模型队列仅从主题字符串解析为的 TOPIC 对象中提取,创建的队列由队列“管理”经理。事实上,对主题字符串具有sub 权限的应用程序 ID 甚至不需要对 TOPIC 对象指定的模型队列的权限,另一方面,如果应用程序创建了一个临时队列 (TQ),它将需要 inq以及模型队列上的put 和/或get,具体取决于它将是生产者和/或消费者。
猜你喜欢
  • 1970-01-01
  • 2021-06-09
  • 2017-10-08
  • 2020-09-17
  • 1970-01-01
  • 2015-01-17
  • 1970-01-01
  • 1970-01-01
  • 2023-03-22
相关资源
最近更新 更多