【发布时间】:2019-02-15 02:31:42
【问题描述】:
我在开发订阅 MQ 主题(MQ 版本 9)的应用程序时遇到问题。
我需要进行共享主题连接,因为应用程序将在多个实例(集群)中运行。
规格和文档说: “客户端使用非持久共享订阅,该客户端需要能够在多个消费者之间共享从主题订阅接收消息的工作。因此,非持久共享订阅可能有多个消费者。来自订阅将仅交付给该订阅中的一个消费者。”
对我来说,所有使用相同订阅名称的客户端都在同一个“集群”中,一次只有一个客户端会收到一条消息。
在我的代码中,受article 的启发,当第二个客户端尝试创建共享订阅时,我遇到了异常。我真的不明白这是 MQ 客户端库实现中的错误还是我的代码中的错误。
这是我的示例代码:
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class TestGB2 {
public static void main(final String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
}
}
public static class MyThread implements Runnable {
private final String topicString;
private final String clientId;
private final String subscriptionName;
public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
Thread.currentThread().setName(threadName);
this.topicString = topicString;
this.clientId = clientId;
this.subscriptionName = subscriptionName;
}
@Override
public void run() {
try {
System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
cf.setHostName("xxxx");
cf.setPort(1416);
cf.setQueueManager("xxxx");
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setChannel("xxx");
cf.setClientID(clientId);
Connection con = cf.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
con.start();
Topic topic = session.createTopic(topicString);
MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here
System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
Message msg = messageConsumer.receive();
System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));
}
catch (Exception ex) {
System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
ex.printStackTrace();
}
}
}
}
下面的代码尝试创建 10 个线程来消费同一主题的消息。只有第一个线程能够连接,所有其他线程都失败并出现以下异常:
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
at TestGB.lambda$0(TestGB.java:33)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
... 11 more
尝试使用最后一个库:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
【问题讨论】:
-
嗨,谢谢,我尝试了以下情况:1)所有客户端的唯一客户端 ID + 相同的订阅名称:不起作用,2)唯一的客户端 ID + 随机子名称:有效但不是我想要的(所有客户端都会收到消息) 3)随机客户端 ID + 唯一订阅名称;不起作用,(所有客户端都收到消息)
-
我会尝试查找最新的库
-
我已经尝试使用最后一个 com.ibm.mq.allclient 库,仍然是同样的错误:(
-
我已经改变了:查看我更新的代码。正如你所说,我正在使用相同的客户端 ID 和相同的订阅名称,我希望订阅由所有客户端共享,但它仍然不起作用。
-
现在我使用的是 com.ibm.mq.allclient v 9.1.1.0
标签: java jms ibm-mq subscription shared