【发布时间】:2020-02-13 11:27:40
【问题描述】:
所以我是响应式编程的新手,我写了一些我想测试的代码。这些更多是集成测试,因为我正在实时复制文件并稍后检查它们是否相同。我有一个MockWebServer 嘲笑我的回应是4xx,这在代码中得到了很好的处理。不幸的是,我也得到了io.netty.handler.timeout.ReadTimeoutException,它覆盖了我的自定义WebClientResponseException,所以在测试中我得到了错误的异常。基本上我有两个问题,我到底为什么会得到这个io.netty.handler.timeout.ReadTimeoutException 异常?由于某种原因,它只出现在doOnError() 方法之后,我不确定它为什么会发生。
现在的代码是同步的,我很清楚这一点。
第二个问题是,在给定次数的重试后,如何处理测试中的自定义异常?现在它是 3,然后我才希望抛出我的另一个异常。
代码如下:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileDataStream = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
.doOnError(throwable -> log.info("fileDataStream onError", throwable))
)
.flatMapMany(Function.identity());
return DataBufferUtils
.write(fileDataStream, fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
throw new WritingException("failed force update to file channel", e);
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
log.warn("failed force update to file channel", e);
throw new WritingException("failed force update to file channel", e);
}
})
.doOnError(throwable -> targetPath.toFile().delete())
.then(Mono.just(target));
响应是Mono<Path>,因为我只对新创建和复制的文件的Path 感兴趣。
欢迎任何有关代码的 cmets。
复制机制是基于这个线程Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor制作的
【问题讨论】:
标签: java reactive-programming webclient spring-webflux project-reactor