【问题标题】:Spring Integration - Request-Reply ImplementationSpring Integration - 请求-回复实现
【发布时间】:2016-11-24 04:36:19
【问题描述】:

我是 Spring Integration 的新手,也是 Stack Overflow 的新手。我正在寻求一些帮助来理解 Spring Integration,因为它与请求-回复模式有关。从网上阅读,我认为我应该使用服务激活器来启用这种类型的用例。

我正在使用 JMS 来促进基于 XML 的消息的发送和接收。我们的主要实现是 IBM Websphere MQ。

我也在使用 Spring Boot(版本 1.3.6.RELEASE)并尝试使用基于纯注解的配置方法(如果可能的话)。我已经在网上搜索并看到了一些示例,但到目前为止我所看到的没有任何东西可以帮助我理解它们是如何组合在一起的。 Spring Integration 文档非常好,但我仍在为如何将所有部分组合在一起而苦苦挣扎。如果有什么我错过了,我提前道歉。我将在这里发帖视为最后的选择。

这是我的配置:

package com.daluga.spring.integration.configuration

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;
import com.ibm.msg.client.wmq.WMQConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;

//import com.ibm.msg.client.services.Trace;

@Configuration
public class MQConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQConfiguration.class);

    @Value("${host-name}")
    private String hostName;

    @Value("${port}")
    private int port;

    @Value("${channel}")
    private String channel;

    @Value("${time-to-live}")
    private int timeToLive;

    @Autowired
    @Qualifier("MQConnectionFactory")
    ConnectionFactory connectionFactory;

    @Bean(name = "jmsTemplate")
    public JmsTemplate provideJmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setExplicitQosEnabled(true); 
        jmsTemplate.setTimeToLive(timeToLive);
        jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     
        return jmsTemplate;
    }

    @Bean(name = "MQConnectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory ccf  = new CachingConnectionFactory();

        //Trace.setOn();

        try {
            MQConnectionFactory mqcf = new MQConnectionFactory();
            mqcf.setHostName(hostName);
            mqcf.setPort(port);
            mqcf.setChannel(channel);
            mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            ccf.setTargetConnectionFactory(mqcf);
            ccf.setSessionCacheSize(2);   
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return ccf;
    }

    @Bean(name = "requestQueue")
    public Destination createRequestQueue() {

        Destination queue = null;

        try {
            queue = new MQQueue("REQUEST.QUEUE");
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return queue;
    }

    @Bean(name = "replyQueue")
    public Destination createReplyQueue() {

        Destination queue = null;

        try {
            queue = new MQQueue("REPLY.QUEUE");
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return queue;
    }

    @Bean(name = "requestChannel")
    public QueueChannel createRequestChannel() {

        QueueChannel channel = new QueueChannel();

        return channel;
    }

    @Bean(name = "replyChannel")
    public QueueChannel createReplyChannel() {

        QueueChannel channel = new QueueChannel();

        return channel;
    }

}

这是我的服务类:

package com.daluga.spring.integration.service

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;


@Service
public class MyRequestReplyService {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyRequestReplyService.class);

    @ServiceActivator(inputChannel = "replyChannel")
    public void sendAndReceive(String requestPayload) {
        // How to get replyPayload
    }

}

因此,在这一点上,我不太确定如何将所有这些粘合在一起以使其工作。我不明白如何将我的请求和回复队列粘合到服务激活器以使这一切正常工作。

我正在调用的服务(基于 JMS/Webshere MQ)使用典型的消息和相关 ID,以便我可以将请求正确地绑定到相应的响应。

任何人都可以为我提供有关如何使其工作的任何指导吗?请让我知道我可以提供哪些其他信息来说明这一点。

提前感谢您的帮助!

【问题讨论】:

    标签: spring-integration ibm-mq


    【解决方案1】:

    网关提供请求/回复语义。

    您应该使用Spring Integration's built-in JMS Support,而不是直接使用 JmsTemplate。

    @Bean
    @ServiceActivator(inputChannel="requestChannel")
    public MessageHandler jmsOutGateway() {
        JmsOutboundGateway outGateway = new JmsOutboundGateway();
        // set properties
        outGateway.setOutputChannel(replyChannel());
        return outGateway;
    }
    

    如果您想自己动手,请更改服务激活方法以返回回复类型并使用模板sendAndReceive()convertSendAndReceive() 方法之一。

    sample app 使用 XML 配置,但应该提供一些额外的指导。

    【讨论】:

    • 谢谢,加里!这有帮助。我退后一步,决定使用基于 xml 的配置。我已配置入站和出站网关,并且可以看到在我的回复队列中创建了消息使用者。启动过程中没有错误。我能够将消息放入回复队列(使用 JMS 实用程序)并看到它被拾取。但我还不太了解的是如何通过服务激活器发起对请求队列的调用。我创建了一个类并使用 MessageEndpoint 注释它,并使用 ServiceActivator 在该类上添加一个方法。然后我通过 main 方法调用它。
    • requestChannel() 更改为DirectChannel 并向其发送Message<?> - 直接或通过Messaging Gateway。我们通常推荐后者,而不是直接与消息传递基础设施交互的用户代码。查看示例应用以获取示例。
    • 加里,感谢您的指导!我能够将它全部连接起来并正常工作。一旦您了解发生了什么,就很容易将其中一个连接起来。保重,丹。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-05-29
    • 1970-01-01
    • 1970-01-01
    • 2020-11-03
    • 2013-04-21
    • 2014-07-30
    • 1970-01-01
    相关资源
    最近更新 更多