【问题标题】:Sending amqp message to ibm mq向 ibm mq 发送 amqp 消息
【发布时间】:2019-08-29 09:21:50
【问题描述】:

尝试将消息从 RabbitMQ <int-amqp:inbound-channel-adapter 传输到 MQSeries <int-jms:outbound-channel-adapter。这很好用。

实际上,MQSeries 上的一些生产者使用这样的 IBM JMS 类:

MQMessage messageMQ = new MQMessage();
messageMQ.format = "        ";
messageMQ.persistence = 1;
messageMQ.correlationId = MQ_MESSAGE_CORRELATION_ID;
messageMQ.write(message.getMessageData());
MQPutMessageOptions putMessageOption = new MQPutMessageOptions();
putMessageOption.options = 8194;
MQQueue queue = openQueue(destinataire, 8208);
queue.put(messageMQ, putMessageOption);

我尝试像这样在 amqp 和 jms 之间使用转换器:

<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="fromRabbit"
         method="transform" output-channel="toJms"/>


public MQMessage transform(Message<?> msg) throws Exception {

    MQMessage result = new MQMessage();
    result.format = "        ";
    result.persistence = 1;
    result.correlationId = MQC.MQCI_NONE;

    String test = "message to send ";
    result.write(test.getBytes());
    return result;
}

msg.getPayload() 中存储的 Object 类型是什么?如何将其转换为 String 对象?

实施此方法时,我有一个例外,因为出站需要 JMS 消息而不是 com.ibm.mq.MQMessage!

Cannot convert object of type [com.ibm.mq.MQMessage] to JMS message

这种方式正确吗?

或者我应该删除出站通道并使用服务激活器来代替 IBM 的特定代码?

感谢您的帮助

问候

按照 Artem 的回答进行编辑

按照jms出站配置:

<bean id="jmsConnectionFactory" class="com.ibm.mq.jms.MQConnectionFactory">
    <property name="queueManager" value="${queueManager}" />
    <property name="hostName" value="${hostName}" />
    <property name="port" value="${port}" />
    <property name="channel" value="${channelName}" />
    <property name="transportType" value="1" />
</bean>
<bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
    <property name="baseQueueManagerName" value="${queueManager}" />
    <property name="baseQueueName" value="${queueName}" />
    <property name="targetClient" value="1" />
</bean>
<bean id="jmsConnectionFactory_cred"
    class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
    <property name="username" value="${user}"/> 
    <property name="password" value="${password}"/> 
</bean> 

<bean id="connectionFactoryCaching"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory_cred" />
    <property name="sessionCacheSize" value="${BRIDGE_MQ_OUTBOUND_SESSION_CACHE}" />
</bean>

<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice" id="requestHandler">
    <property name="trapException" value="false"/>
    <property name="onFailureExpressionString" value="#this"/>
    <property name="failureChannel" ref="processChannel1"/>
</bean>

<int-jms:outbound-channel-adapter   channel="channelRmqMQ" 
                                    id="jmsOut" destination="jmsQueue" connection-factory="connectionFactoryCaching" delivery-persistent="true" 
                                    explicit-qos-enabled="true" session-transacted="true" >
    <int-jms:request-handler-advice-chain>
        <ref bean="requestHandler" />
    </int-jms:request-handler-advice-chain>                                         
</int-jms:outbound-channel-adapter>

【问题讨论】:

  • 那不是 MQ JMS 代码。它使用 MQ Java 类 - 底层 MQI 的更直接表示。
  • 你想在你的transform 方法中做什么?您是否正在尝试创建 JMS 消息?如果是这样,那么它不应该返回类似于 javax.jms.message 的东西吗?应该使用 javax.jms.JMSContext 对象上的 create 方法创建?
  • 我尝试将消息转换为 MQMessage。我需要使用这个类,因为在消费者使用的消息上设置了一些属性(就像我的问题中定义的那样)。如何将有效负载(来自 amqp Rabbitmq)转换为字符串?感谢您的帮助

标签: spring-integration ibm-mq


【解决方案1】:

如果你的AMQP消息带有text/*contentType,那么它的正文会被AmqpInboundChannelAdapter中的开箱即用的SimpleMessageConverter自动转换为字符串:

     if (contentType != null && contentType.startsWith("text")) {
            String encoding = properties.getContentEncoding();
            if (encoding == null) {
                encoding = this.defaultCharset;
            }

            try {
                content = new String(message.getBody(), encoding);
            } catch (UnsupportedEncodingException var8) {
                throw new MessageConversionException("failed to convert text-based Message content", var8);
            }

否则你需要在两者之间放置一个简单的转换器来将byte[] 转换为字符串:

<object-to-string-transformer>

&lt;int-jms:outbound-channel-adapter&gt; 正好用于 JMS 协议交互,因此,您的 MQMessage 不会在那里被接受。这就是你得到Cannot convert object of type [com.ibm.mq.MQMessage] to JMS message 异常的原因。

是的,您可以在一些自定义 service-activator 中直接使用 IBM MP API,但是我建议您查看 IBM WebSphere 上的 JMS 到 MQ 桥接器。那么你只需要配置一个合适的连接工厂并从&lt;int-jms:outbound-channel-adapter&gt;使用它:

<jee:jndi-lookup id="jndiMqConnectionFactory" jndi-name="${mqConnectionFactory}"/>

<bean id="jmsQueueConnectionFactory"
          class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="jndiMqConnectionFactory"/>
        <property name="username" value="${mqLogin}"/>
        <property name="password" value="${mqPassword}"/>
    </bean>

<jee:jndi-lookup id="myMqQueue" jndi-name="queue/myMqQueue"/>

<bean id="mqQueueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
    <property name="defaultDestination" ref="myMqQueue"/>
</bean>

<jms:outbound-channel-adapter channel="myMqChannel" jms-template="mqQueueJmsTemplate"/>

【讨论】:

  • 以字符串形式获取消息是可以的!我已经使用了&lt;int-jms:outbound-channel-adapter&gt;,它工作得很好;-)(请参阅我在问题中的编辑)以前的 JMS 代码正在发送 jms 标准中未定义的其他属性,例如 messageMQ.format = " "; 所以我想在我的出站频道。有没有办法做到这一点?
  • 在发送到&lt;int-jms:outbound-channel-adapter&gt; 之前,尝试使用header-enricher 将所有这些自定义属性映射到headersDefaultJmsHeaderMapper 会将这些自定义标头映射到 jmsMessage.setObjectProperty(),我希望它们已经通过 JMS 桥被重新映射到 WebSphere 上的 MQMessage
  • 我尝试使用标题丰富器并尝试更新消息格式,例如&lt;int:header name="format" expression="'MQSTR '" overwrite="true"/&gt;。调用 DefaultJmsHeaderMapper 并设置属性 jmsMessage.setObjectProperty(propertyName, value); 但不幸的是,在我的 MQExplorer 中,格式未更新。感谢您的帮助
猜你喜欢
  • 1970-01-01
  • 2021-06-15
  • 1970-01-01
  • 2016-04-08
  • 2013-08-31
  • 1970-01-01
  • 2018-09-14
  • 1970-01-01
  • 2015-06-30
相关资源
最近更新 更多