【发布时间】:2018-07-05 06:37:02
【问题描述】:
SI 5+ 支持WebFlux,这意味着我们现在可以构建一个反应式消息传递系统。然而,这也意味着设计已经经过深思熟虑,通常的错误处理方法不起作用。在响应式流中,消息是Publisher(Flux),它不会引发异常,但会发出错误通知。因此,消息上设置的错误通道标头是无用的,因为 SI 不知道Flux 导致了错误。
考虑以下代码:
.handle(WebFlux.outboundGateway(m -> m.getPayload().toString(), webClient)
.expectedResponseType(YelpRecord.class)
.httpMethod(GET)
.mappedRequestHeaders(ACCEPT)
.replyPayloadToFlux(true))
.handle((GenericHandler<Flux<YelpRecord>>) (flux, headers) ->
flux
.doOnError(t -> log.error(t.getMessage(), t))
.doAfterTerminate(() ->
log.info("Completed streaming from: {}.", headers.get(DOWNLOAD_URI_HEADER))
)
.onBackpressureBuffer(
yelpArtifactoryProperties.getOnBackpressureBufferSize(),
BufferOverflowStrategy.ERROR)
)
上面的代码 sn-p 中缺少的是将异常发送到在来自doOnError 的消息上配置的错误通道。我们该怎么做?
【问题讨论】:
标签: spring spring-integration reactive-programming spring-webflux