【发布时间】:2016-04-20 11:49:44
【问题描述】:
我有一个非常简单的集成流程,其中使用发布-订阅通道将 RESTful 请求转发给两个提供者。然后将两个 RESTful 服务的结果聚合到一个数组中。集成流程示意图如下:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
但是,在运行我的代码时,生成的数组只包含由 RESTful 服务之一返回的项目。有没有我遗漏的配置步骤?
更新
以下版本对应完整解决方案,考虑到Artem的cmets。
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
【问题讨论】:
标签: java spring-integration dsl aggregator