【问题标题】:How to put an object into S3 using Webflux asynchronously?如何使用 Webflux 异步将对象放入 S3?
【发布时间】:2020-06-10 11:42:29
【问题描述】:

一篇文章 AWS S3 with Java – Reactive 描述了如何将 AWS SDK 2.0 客户端与 Webflux 一起使用。

example 中,他们使用以下处理程序上传到 S3,然后返回 HTTP Created 响应:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
  @RequestBody Flux<ByteBuffer> body) {

    long length = headers.getContentLength();
    String fileKey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();

    CompletableFuture future = s3client
      .putObject(PutObjectRequest.builder()
        .bucket(s3config.getBucket())
        .contentLength(length)
        .key(fileKey.toString())
        .contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
        .metadata(metadata)
        .build(), 
      AsyncRequestBody.fromPublisher(body));

    return Mono.fromFuture(future)
      .map((response) -> {
        checkResult(response);
        return ResponseEntity
          .status(HttpStatus.CREATED)
          .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
      });
}

这按预期工作。在尝试学习 WebFlux 时,我预计以下将在调用 subscribe 方法的同一线程中异步完成 HTTP 上传到 S3:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {

    long length = headers.getContentLength();
    String fileKey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();

    Mono<PutObjectResponse> putObjectResponseMono = Mono.fromFuture(s3client
        .putObject(PutObjectRequest.builder()
            .bucket(s3config.getBucket())
            .contentLength(length)
            .key(fileKey.toString())
            .contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
            .metadata(metadata)
            .build(),
        AsyncRequestBody.fromPublisher(body)));

    putObjectResponseMono
        .doOnError((e) -> {
            log.error("Error putting object to S3 " + Thread.currentThread().getName(), e);
        })
        .subscribe((response) -> {
            log.info("Response from S3: " + response.toString() + "on " + Thread.currentThread().getName());
        });

    return Mono.just(ResponseEntity
        .status(HttpStatus.CREATED)
        .body(new UploadResult(HttpStatus.CREATED, new String[]{fileKey})));
}

HTTP POST 按预期完成,但 S3 put 请求失败并显示以下日志消息:

2020-06-10 12:31:22.275 ERROR 800 --- [tyEventLoop-0-4] c.b.aws.reactive.s3.UploadResource       : Error happened on aws-java-sdk-NettyEventLoop-0-4

software.amazon.awssdk.core.exception.SdkClientException: 400 BAD_REQUEST "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.baeldung.aws.reactive.s3.UploadResult>> com.baeldung.aws.reactive.s3.UploadResource.uploadHandler(org.springframework.http.HttpHeaders,reactor.core.publisher.Flux<java.nio.ByteBuffer>)"
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107) ~[sdk-core-2.10.27.jar:na]
    ........

我怀疑解释涉及对 S3 的请求在其自己的线程上运行,但我很难弄清楚出了什么问题,你能解释一下吗?

【问题讨论】:

    标签: java amazon-web-services amazon-s3 spring-webflux reactor


    【解决方案1】:

    试试这个

    @RequestBody Flux<ByteBuffer> body
    >>> replace @RequestBody byte[]
    

    AsyncRequestBody.fromPublisher(body)
    >>> replace .fromBytes(body)
    

    如果您想从另一个线程订阅,请使用:.subscribeOn({Scheduler})

    【讨论】:

      猜你喜欢
      • 2020-09-21
      • 2023-03-05
      • 2022-09-26
      • 1970-01-01
      • 1970-01-01
      • 2016-06-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多