【问题标题】:Apache Camel and IBM MQApache Camel 和 IBM MQ
【发布时间】:2016-05-05 11:19:53
【问题描述】:

寻找 Apache Camel 和 IBM MQ 之间集成的一些示例。

Apache Camel 充当 MQ 和我们基于 Java 的套接字应用程序之间的中间件路由器。

【问题讨论】:

标签: apache-camel ibm-mq


【解决方案1】:

我是使用带有 MQConnectionFactory 的 JmsComponent 完成的,这是我的代码:

@Bean(name = "wmq")
public JmsComponent wmq() throws JMSException {

    JmsComponent wmq = new JmsComponent();

    MQConnectionFactory mqConnectionFactory =  new MQConnectionFactory();
    mqConnectionFactory.setTransportType(1);
    mqConnectionFactory.setHostName("localhost");
    mqConnectionFactory.setPort(1414);
    mqConnectionFactory.setQueueManager("QMGRSCORE");
    mqConnectionFactory.setChannel("EXTAPP.SRVCONN");

    wmq.setConnectionFactory(mqConnectionFactory);

    return wmq;

}

在我的路线中,我这样称呼:

from("wmq:queue:myqueue").id("myMq")

【讨论】:

    【解决方案2】:

    我所能得到的最好的记录在下面,说明为一个 Spring XML 应用程序上下文,它本身托管 CAMEL 上下文和路由。此示例与 IBM 本机 MQ JCA 兼容的资源适配器 v7.5、CAMEL 2.16、Spring core 4.2 一起使用。我已经将它部署在 Glassfish、Weblogic 和 JBoss EAP7 服务器中。

    复杂性必然会处理 MQ 报告的流程,其理念与普通 JMS 回复消息的理念相冲突。详细解释请参考Implementing native websphere MQ with CoD over Camel JMS component

    这个基于 CAMEL XML DSL 的示例是独立的,易于测试。

    我们从 Spring 和 CAMEL 声明开始:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
    

    CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,这里链接起来形成一个桥梁以简化测试。

    <camel:camelContext id="mqBridgeCtxt">
    <camel:route id="mq2jms" autoStartup="true">
    

    奇怪:在 Weblogic 上,获取(例如)3 个侦听器的唯一方法是强制执行 3 个连接(依次使用 3 个 Camel:from 语句),每个连接最多 1 个会话,否则会出现 MQ 错误:MQJCA1018:只有一个会话允许每个连接。在 JBoss 上,您可以简单地调整 concurrentConsumers=...

      <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    

    上面的 disable disableReplyTo 选项确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复。此处未说明测试和回复结构。

    然后我们在下一次发布到普通 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致。

      <camel:setExchangePattern pattern="InOnly"/>
      <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
      <camel:to uri="ref:innerQueue" />
    </camel:route>
    

    这结束了 MQ 到 jms 的路由;接下来是 jms-to-MQ 路由仍然在相同的 CAMEL 上下文中:

    <camel:route id="jms2mq"  autoStartup="true">
      <camel:from uri="ref:innerQueue" />
      <!-- remove inner message headers and properties to test without inbound side effects! -->
      <camel:removeHeaders pattern="*"/> 
      <camel:removeProperties pattern="*" />
      <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
    

    现在是远程目标返回的 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。

      <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
      <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
      <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
    

    ReplyTo 队列可以通过 ReplyTo uri 选项指定,也可以作为下面的标头指定。

    接下来,我们使用 CamelJmsDestinationName 头来强制抑制 JMS MQ 消息头 MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想发送一个普通的 MQ 二进制消息(即只有 MQMD 消息描述符后跟有效负载)。

      <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
      <camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
    

    更多 MQMD 字段可以通过保留的 JMS 属性来控制,如下图所示。请参阅 IBM 文档中的限制。

      <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
      <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
    

    URI 中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的队列名称成为占位符。

    根据观察,URI 选项 preserveMessageQos 允许发送设置了 ReplyTo 数据的消息(以获取 MQ CoD 报告),但通过强制 InOnly MEP 阻止 CAMEL 实例化回复消息侦听器。

      <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
    </camel:route>
    </camel:camelContext>
    

    我们还没有完成,我们还需要为本地 JMS 提供程序和 Websphere MQ(通过本地 IBM WMQ JCA 资源适配器)声明队列工厂,以便根据您的上下文进行调整。我们在这里使用 JNDI 查找管理对象。

    <camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
    </camel:endpoint>
    
    <jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
    <jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>
    
    <bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
      <property name="connectionFactory" ref="jmsraQCFBean" />
    </bean>
    
    <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
      <property name="connectionFactory" ref="mqQCFBean" />
    </bean>
    
    </beans>
    

    从 JNDI 获取工厂(和 JCA 适配器)的替代方法是将 JMS 客户端声明为 Spring bean。在 Weblogic 和 Glassfish 中,通过部署本机 IBM JCA 资源适配器并创建 JNDI 资源,然后在上面的 Spring 上下文中引用,您会得到更好的启发,在 JBoss 中,直接 MQ 客户端 bean 声明最适合如下)

    <bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
        <property name="hostName" value="${mqHost}"/>
        <property name="port" value="${mqPort}"/>
        <property name="queueManager" value="${mqQueueManager}"/>
        <property name="channel" value="${mqChannel}"/>
        <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
        <property name="appName" value="${connectionName}"/>
    </bean>
    
    <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory" ref="mqCFBean"/>
        <property name="transacted" value="true"/>
        <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
    </bean>
    

    欢迎提出意见和改进。

    【讨论】:

      【解决方案3】:

      您应该在组件扫描包中定义一个 JmsComponent bean,并为其提供所有连接工厂属性并在您的路由中使用该 bean。 示例:

      @Bean
      public JmsComponent ibmmq(){
          JmsComponent ibmmq = new JmsComponent();
          MQTopicConnectionFactory factory = new MQTopicConnectionFactory();
          factory.setQueueManager("yyy"); //yyy is you QueueManager
          factory.setHostName("zzz");  //zzz is your host
          factory.setPort(111);
          factory.setTransportType(1);
          ibmmq.setConnectionFactory(factory);
          return ibmmq; 
      }
      

      并在您的路线中添加以下内容

        camelContext.addRoutes(
                      new RouteBuilder()
                      {
                          @Override
                          public void configure() throws Exception
                          {
                              from("ibmmq:topic:YY.ZZZ").to("stream:out"); //YY.ZZZ is your topic name. If Queue use ibmmq:queue:YY.ZZZ.
                          }
                      });
      

      如果您使用“stream:out”进行测试,请在依赖项和 camel-stream 中包含 camel-jms 和 IBM MQ 库。

      【讨论】:

        【解决方案4】:

        【讨论】:

          猜你喜欢
          • 2018-02-08
          • 2018-06-27
          • 1970-01-01
          • 2016-05-07
          • 2019-08-27
          • 2015-04-22
          • 2018-02-10
          • 2021-10-26
          • 1970-01-01
          相关资源
          最近更新 更多