【问题标题】:Spring Integration Java DSL -- Configuration of aggregatorSpring Integration Java DSL——聚合器的配置
【发布时间】:2016-04-20 11:49:44
【问题描述】:

我有一个非常简单的集成流程,其中使用发布-订阅通道将 RESTful 请求转发给两个提供者。然后将两个 RESTful 服务的结果聚合到一个数组中。集成流程示意图如下:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

但是,在运行我的代码时,生成的数组只包含由 RESTful 服务之一返回的项目。有没有我遗漏的配置步骤?

更新

以下版本对应完整解决方案,考虑到Artem的cmets。

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}

【问题讨论】:

    标签: java spring-integration dsl aggregator


    【解决方案1】:

    其实不然。

    .aggregate()publishSubscribeChannel第三个订阅者。

    您必须将您的流量切断到其中两个。像这样:

        @Bean
        public IntegrationFlow publishSubscribeFlow() {
            return flow -> flow
                    .publishSubscribeChannel(s -> s
                            .applySequence(true)
                            .subscribe(f -> f
                                    .handle((p, h) -> "Hello")
                                    .channel("publishSubscribeAggregateFlow.input"))
                            .subscribe(f -> f
                                    .handle((p, h) -> "World!")
                                    .channel("publishSubscribeAggregateFlow.input"))
                    );
        }
    
        @Bean
        public IntegrationFlow publishSubscribeAggregateFlow() {
            return flow -> flow
                    .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                            .stream()
                            .<String>map(m -> (String) m.getPayload())
                            .collect(Collectors.joining(" "))))
                    .channel(c -> c.queue("subscriberAggregateResult"));
        }
    

    请注意两位订阅者对.channel("publishSubscribeAggregateFlow.input") 的使用。

    说实话,这是任何publish-subscribe 的重点。如果我们要聚合它们,我们必须知道将所有订阅者的结果发送到哪里。

    您的用例让我想起了Scatter-Gather EIP 模式。

    我们还没有在 DSL 中实现它。 请随时就此事提出GH issue,我们将在即将发布的1.2 版本中尝试处理。

    更新

    关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75

    【讨论】:

    • 非常感谢您帮助 Artem。事实上,我之前尝试过使用渠道和分离流,但没有成功,因为我也遇到了聚合器的问题。你的回答也给了我关于如何编写聚合器的提示。
    • 太棒了!但是 Scatter-Gather 将作为 DSL 的一个很好的补充。所以,无论如何都不要羞于提出 GH 问题!