【问题标题】:Spring Integration 4 asynchronous request/responseSpring Integration 4 异步请求/响应
【发布时间】:2014-07-16 09:58:47
【问题描述】:

我正在尝试使用 Spring Integration v4 的 DSL API 编写一个简单的消息流,如下所示:

       -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
       <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE

由于请求/响应是异步的,当我通过初始网关注入消息时,消息会一直传递到 JMS_OUT_QUEUE。在此消息流之外,回复消息被放回 JMS_IN_QUEUE,然后由 JmsGatewayIn 拾取。此时,消息被处理并放入 out.ch (我知道响应到达 out.ch,因为我有一个记录器拦截器记录放置在那里的消息)但是,网关从未收到响应。

此消息流之外的系统从 JMS_OUT_QUEUE 获取消息并将响应放入 JMS_IN_QUEUE,而不是响应,而是在其自己的 JmsOutboundgateway 上接收到 javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object(我认为它无法反序列化 jms 回复对象通过查看日志)。

我显然没有正确配置某些东西,但我不知道具体是什么。有谁知道我错过了什么?

使用 spring-integration-core-4.0.3.RELEASE、spring-integration-jms-4.0.3.RELEASE、spring-integration-java-dsl-1.0.0.M2、spring-jms-4.0.6 .释放。

我的网关配置如下:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
        replyTimeout = 45000)
    AResponse process(ARequest request);
}

我的集成流程配置如下:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Bean(name = "out.ch")
    public DirectChannel outCh() {
        return new DirectChannel();
    }   

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .correlationKey("JMSCorrelationID")
                .get();
    }

    @Bean
    public IntegrationFlow responseFlow() {

        return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
                .destination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .channel("out.ch")
                .get();
    }
}

感谢您对此的任何帮助, 下午。

【问题讨论】:

    标签: spring-integration spring-jms spring-dsl


    【解决方案1】:

    首先你的配置不好:

    1. 既然你是从WsGateway#process 开始的,你真的应该在那里等待回复。 网关的请求/回复能力基于TemporaryReplyChannel,它作为不可序列化的值被放置到headers

    2. 只要你等待依赖那个网关,实际上没有理由提供replyChannel,如果你不打算在回复上做一些发布-订阅逻辑。

    3. 当您向 JMS 队列发送消息时,您应该了解消费者部分可能是一个单独的远程应用程序。最后一个可能对你的out.ch一无所知。

    4. JMS 请求/回复功能确实基于JMSCorrelationID,但这还不够。这里还有一件事是ReplyTo JMS 标头。因此,如果您要发送来自消费者的回复,您真的应该只依赖JmsGatewayIn 的东西。

    所以我会把你的代码改成这样:

    @MessagingGateway
    public interface WsGateway {
    
        @Gateway(requestChannel = "in.ch", replyTimeout = 45000)
        AResponse process(ARequest request);
    }
    
    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    @ComponentScan
    public class IntegrationConfig {
    
        @Bean(name = "in.ch")
        public DirectChannel inCh() {
            return new DirectChannel();
        }
    
        @Autowired
        private MQQueueConnectionFactory mqConnectionFactory;
    
        @Bean
        public IntegrationFlow requestFlow() {
            return IntegrationFlows.from("in.ch")
                    .handle("processor", "processARequest")
                    .handle(Jms.outboundGateway(mqConnectionFactory)
                            .requestDestination("JMS_OUT_QUEUE")
                            .replyDestination("JMS_IN_QUEUE"))
                    .handle("processor", "processAResponse")
                    .get();
        }
    
    }
    

    如果它适合您,请告诉我,或者尝试解释为什么您在一个one-way 案例中使用two-way 网关。也许Jms.outboundAdapter()Jms.inboundAdapter() 更适合你?

    更新

    如何在 Java DSL 中使用&lt;header-channels-to-string&gt;

    .enrichHeaders(e -> e.headerChannelsToString())
    

    【讨论】:

    • 您好,您的解决方案是正确的,谢谢。我首先尝试用 Jms In/Out 适配器替换 Jms In/Out 网关,但在配置它们时也遇到了麻烦。然后我在您的解决方案中只尝试了带有replyDestination 的Jms Out Gateway,并且效果很好。我没有删除最初的@Gateway's replyChannel,正如你提到的,它是不需要的。谢谢!
    • 你好,我发现自己需要回到这篇文章......我有一个要求,我认为我确实需要使用单独的 JMS 入站/出站适配器来允许异步响应。为了做到这一点,我再次在 WsGateway 上设置了一个 replyChannel="out.ch",将 Jms.outboundGateway 切换为 Jms.outboundAdapter 和 Jms.inboundAdapter,并在发送之前在消息上设置了 JMSCorrelationID 标头,但是,当响应返回并放入 out.ch 时,WsGateway 不接收响应消息?我的 SI 流程实际上比这里显示的要复杂一些...
    • 在发送请求前尝试使用&lt;header-enricher&gt;&lt;header-channels-to-string&gt;
    • 如何在新的 DSL 表单中使用header-channels-to-string?是否只是将其设置为具有空值的标头名称的情况?
    • 是的,你是对的。所以,你必须做这个解决方法:github.com/spring-projects/spring-integration-extensions/blob/…
    猜你喜欢
    • 1970-01-01
    • 2021-11-20
    • 2018-10-07
    • 1970-01-01
    • 1970-01-01
    • 2018-12-10
    • 1970-01-01
    • 1970-01-01
    • 2014-07-31
    相关资源
    最近更新 更多