【问题标题】:How to send message to different Queue hosted in different queue manager and hostname in IBM MQ cluster如何将消息发送到 IBM MQ 集群中不同队列管理器和主机名中托管的不同队列
【发布时间】:2017-09-17 11:08:18
【问题描述】:

我的基于 Apache-camel 的应用程序正在使用来自 IBM 队列之一的消息,例如以下是连接工厂的详细信息

hostname=host1000
QManager=QM1000
Port="some port"
Channel="common channel"

Camel 流消费和处理并将来自消息头的响应发送到ReplyQueue。

 from(wmq:queue:<INPUT_QUEUE>)
.bean("processBean")
.bean("beanToSendMsgToReplyQueue")

在骆驼标题中,我低于 JMSReplyQueue。你可以看到它是一个不同的队列管理器,这个队列管理器来自不同的主机,但是在集群环境中。

JMSReplyTo = queue://QM1012/TEST.REPLY?targetClient=1

队列管理器也介于两者之间。喜欢

queue://<queue-manager>//<queue-name>?<other parameters>

以下是我在发送消息时遇到的异常。

ERROR o.apache.camel.processor.DefaultErrorHandler:215 - Failed delivery for (MessageId: ID-xxxxxxxxx-0-4 on ExchangeId: ID-xxxxxx-42443-1492594420697-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: wmq://queue://QM1012/TEST.REPLY?targetClient=1 due to: Failed to resolve endpoint: wmq://queue://TAP2001R5/TEST?targetClient=1 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{targetClient=1}]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[sendTo(Endpoint[wmq://queue:BACKOUT_Q])], Channel[DelegateSync[com.xxx.yyy.listener.XXXOnExceptionProcessor@21c66ee4]], Channel[Stop]]]]

谁能帮我将消息发送到不同的队列管理器队列,这些队列位于不同的主机中,但都在同一个集群中。队列管理器名称也出现在字符串中间,那么如何解决这个问题。 如果您需要更多详细信息,请告诉我。

更新-1: 尝试使用相同的队列管理器且不带参数

JMSReplyTo = queue://QM1000/QUEUE_V1 我得到以下异常

org.springframework.jms.InvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2189' ('MQRC_CLUSTER_RESOLUTION_ERROR').

Update-2

我可以使用普通的 javax.jms.* 和 com.ibm.mq.jms.* api 向 JMSReplyTo 发送消息,但不能通过 Apache camel。骆驼用户/开发人员组的任何人都可以帮助我使用骆驼组件处理相同的事情吗?

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://QM1012/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://QM1012/TEST.REPLY", exchange); */
}

【问题讨论】:

  • targetClient=1 表示 JMS 不应为 JMS 标头添加 IBM MQ 类(称为 RFH2 标头),换句话说,远程应用程序可能不是 JMS 应用程序。您是否可能需要在 Camel 流程中以不同的方式指定它?
  • 您所拥有的 URI 减去 wmq:// 是您如何将特定队列管理器上的队列指定给 IBM MQ JMS 类。我不熟悉骆驼来帮助你更多。如果 MQ 7.0 或更早版本不受支持,或者在 MQ 7.1 及更高版本中,您需要获得 SCTQ 的权限,您可以提供对特定远程队列管理器的 rqmname 的权限。
  • 您只需连接到使用原始消息的同一个队列管理器,并通过您指定的 URL 发送它。只要您连接到的队列管理器是另一个队列管理器所在的集群的一部分,并且您拥有适当的权限,就可以发送它。您只需要连接到的队列管理器的连接详细信息,因为您不会直接连接到其他队列管理器。
  • 在对 Camel 进行任何操作之前,我建议您在与 Camel 相同的主机上测试您的 uri 以及使用 RFHUTIL 将消息发送到 MQ 集群。如果你成功了,那么 MQ 的东西就会按预期工作。 Camel 的 URI 应该遵循 MQ uri 模式。但首先让它与 RFHUTIL 一起工作。
  • 在您的第一个示例中,QM1000 是您从中读取请求消息的队列管理器,而 QM1012 是回复队列所在的位置?如果是这种情况,则应将回复消息放入 queue://QM1012/QUEUE_V1,但您的连接仍将是 QM1000。 @SoucianceEqdamRashti 建议是验证 URI 是否正确的绝佳方法。

标签: java apache-camel ibm-mq


【解决方案1】:

您希望与两个不同的队列管理器进行通信,因此您需要相应地定义两个 Camel JMS 组件实例。 Camel 无法神奇地知道 QM1000 或 QM1012 的含义以及如何访问 QM。

您首先需要两个 WMQ QM 的两个 JMS 连接工厂实例。如何获得这些取决于您的执行环境。在 JEE 服务器上,配置后可以使用 JNDI 访问连接池。查看有关如何设置 JMS 池的 appserver 文档。如果您独立运行,请查看 Spring JMS 连接缓存或 Atomikos(如果您想要 XA 事务)。 假设 QM1000 的 CF 是 sourceCF 而 QM1012 的 CF 是 targetCF。

现在您可以定义两个 Camel JMS 组件实例,一个用于每个 QM。将连接工厂注入 JMS 组件 (.setConnectionFactory(...))。 假设你定义了一个 ID 为“jmssource”的 Camel JMS 组件,注入 sourceCF。在 JMS 组件 ID“jmstarget”中注入 targetCF。 如何做到这一点取决于您的环境(JEE/CDI、Spring、纯 Java)。看看stackoverflow,有例子。

您现在可以使用以下语法在 Camel 路由上指定 Camel JMS 生产者和消费者:

.from("jmssource:INPUT_QUEUE")
  ...
  (do some processing)
  ...
  .to("jmstarget:QUEUE_V1")

您不能使用 Camel 的 JMS 回复逻辑(使用 JMSReplyTo 标头)回复另一个队列管理器。我认为 JMS 标准不允许这样做。您需要通过发送到回复队列来明确地进行回复。

对于设置 targetClient 选项,目标解析器可能很有用:

import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQDestination;

public class WMQDestinationResolver extends DynamicDestinationResolver implements DestinationResolver {
  private int targetClient = JMSC.MQJMS_CLIENT_JMS_COMPLIANT;

  public void setTargetClient(int targetClient) {
    this.targetClient = targetClient;
  }

  public Destination resolveDestinationName(Session session, String destinationName, boolean isPubSubDomain) throws JMSException {
    Destination destination = super.resolveDestinationName(session, destinationName, isPubSubDomain);
    if (destination instanceof MQDestination) {
      MQDestination mqDestination = (MQDestination) destination;
      mqDestination.setTargetClient(targetClient);
    }
    return destination;
  }
}

【讨论】:

  • 嗨@Sebastian Brandt,感谢您的建议。如果有多个队列管理器,那应该是正确的方法(使用多个骆驼 jms 组件)。我们也不能直接发送
  • 你能详细说明为什么不能直接发送到回复队列吗?您可以将 Camel 交换模式更改为 InOnly,然后 Camel 不会自行发送回复。从技术上讲,Camel 使用 InOut 模式发送响应还是使用 InOnly 模式显式发送响应都没有区别。
  • 抱歉,我的上一篇文章不完整。嗨@Sebastian Brandt,感谢您的建议。如果有多个队列管理器,那应该是正确的方法(使用多个骆驼 jms 组件)。我们也不能直接在骆驼端点中发送 targetClient=1 。对于 Camel JMS 组件 API 文档提供了一些信息(部分:在目标上设置 JMS 提供程序选项)。默认情况下,骆驼的 disableReplyTo=false 怎么样。使用该功能可以正常工作,并且 Camel 能够将消息发送到位于不同队列管理器(集群)中的队列。
  • 我将很快使用默认骆驼功能发布我的工作示例。
  • 我将我的答案固定到 targetClient / 目标解析器。
【解决方案2】:

首先,感谢大家的支持。 我的用例如下(上面也有)。

使用 Apache Camel 连接到 MQ 主机(主机名、队列管理器、端口、通道),并使用属于同一主机/Qmanager 的队列中的消息。消息带有replyToQueue (JMSReplyTo) 标头值。 ReplyToQueue(JMSReplyTo)的值如下

例如

queue://Different_QueueManager_in_Cluster/TEST.REPLY?mdReadEnabled=true&amp;messageBody=0&amp;mdWriteEnabled=true&amp;XMSC_WMQ_REPLYTO_STYLE=1&amp;targetClient=1

现在的问题是,当连接对象连接到上述主机和队列管理器时,如何使用不同的队列管理器将回复消息发送到不同的队列。

注意:所有 MQ 队列管理器都在集群环境中。

解决方案 1: 例如

form(wmq:queue:INPUT_MSG_Q)
 .bean(requestProcessor)
 .bean(responseProcessor)

Apache Camel 默认处理ReplyToQ (JMSReplyTo)。如果您不想回复回复到ReplyToQ (JMSReplyTo),请在使用时使用disableReplyTo=true

注意:在发送到queue://Different_QueueManager_in_Cluster/TEST.REPLY 时,使用相同的连接/连接工厂,MQ 集群将检查消息是否必须发送到集群中指定队列的指定队列管理器。关于以下参数?mdReadEnabled=true&amp;messageBody=0&amp;mdWriteEnabled=true&amp;XMSC_WMQ_REPLYTO_STYLE=1&amp;targetClient=1,Apache Camel 可以在不使用任何第三方解析器的情况下自动解析,同时自动回复JMSReplyTo

解决方案 2:

使用disableReplyTo=true 禁用自动回复并从标头获取队列详细信息并使用普通 javax.jms.* 和 com.ibm.mq.jms.* api 发送消息。代码相同如下。

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://Different_QueueManager_in_Cluster/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://Different_QueueManager_in_Cluster/TEST.REPLY", exchange); */
}

对于参数,使用@Sebastian Brandt(帖子)提到的目标解析器

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-03
    • 2021-01-03
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多