【发布时间】:2020-06-10 11:42:29
【问题描述】:
一篇文章 AWS S3 with Java – Reactive 描述了如何将 AWS SDK 2.0 客户端与 Webflux 一起使用。
在example 中,他们使用以下处理程序上传到 S3,然后返回 HTTP Created 响应:
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));
return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}
这按预期工作。在尝试学习 WebFlux 时,我预计以下将在调用 subscribe 方法的同一线程中异步完成 HTTP 上传到 S3:
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {
long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
Mono<PutObjectResponse> putObjectResponseMono = Mono.fromFuture(s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body)));
putObjectResponseMono
.doOnError((e) -> {
log.error("Error putting object to S3 " + Thread.currentThread().getName(), e);
})
.subscribe((response) -> {
log.info("Response from S3: " + response.toString() + "on " + Thread.currentThread().getName());
});
return Mono.just(ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[]{fileKey})));
}
HTTP POST 按预期完成,但 S3 put 请求失败并显示以下日志消息:
2020-06-10 12:31:22.275 ERROR 800 --- [tyEventLoop-0-4] c.b.aws.reactive.s3.UploadResource : Error happened on aws-java-sdk-NettyEventLoop-0-4
software.amazon.awssdk.core.exception.SdkClientException: 400 BAD_REQUEST "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.baeldung.aws.reactive.s3.UploadResult>> com.baeldung.aws.reactive.s3.UploadResource.uploadHandler(org.springframework.http.HttpHeaders,reactor.core.publisher.Flux<java.nio.ByteBuffer>)"
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107) ~[sdk-core-2.10.27.jar:na]
........
我怀疑解释涉及对 S3 的请求在其自己的线程上运行,但我很难弄清楚出了什么问题,你能解释一下吗?
【问题讨论】:
标签: java amazon-web-services amazon-s3 spring-webflux reactor