【问题标题】:How to properly send Mono<ResponseEntity> as JSON in Netty Reactor HTTP Server response如何在 Netty Reactor HTTP Server 响应中正确发送 Mono<ResponseEntity> 作为 JSON
【发布时间】:2020-08-12 10:02:05
【问题描述】:

我正在尝试弄清楚如何正确地从 Netty Reactor HTTP Server 以 ResponseEntity 作为 JSON 发送响应。

我当前的实现对来自 WebClient 的请求做出反应,并且应该以某种 ResponseEntity 状态发回响应(假设只是 HTTP OK)。

不幸的是,我仍然在客户端收到 InvalidDefinitionException,说由于没有默认构造函数而无法构造实例。

我知道这意味着什么,但例如 Spring Webflux 也可以有返回类型的 rest 端点 Mono 并且客户端不会出现任何问题。 那么是否有可能在服务器端将实体正确序列化为 JSON 并在客户端对其进行反序列化?

这是我的客户

import org.springframework.web.reactive.function.client.WebClient;

public Mono<ResponseEntity> postRequest(final Object body, final String uri) {
        return webClient.post()
                .uri(uri)
                .contentType(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue(body))
                .exchange()
                .flatMap(clientResponse -> clientResponse.toEntity(ResponseEntity.class));
    }

这是我的服务器

    public void runWithPost(final String endpointPath, final ServerCallback callback) {
        server = HttpServer.create()
                .host(this.host)
                .port(this.port)
                .route(routes ->
                        routes.post(endpointPath, (request, response) ->
                                response.addHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                                        .sendString(Mono.just(getJSON(callback.handleCallback())))))
                .wiretap(true)
                .bindNow();

        System.out.println("Starting server...");
    }

    private String getJSON(final ResponseEntity responseEntity) {
        String json = StringUtils.EMPTY;
        try {
            json = objectMapper.writeValueAsString(responseEntity);
            System.out.println("Serialized JSON: " + json);
        } catch (final JsonProcessingException ex) {
            System.err.println("JSON serializer error: " + ex.getMessage());
        }

        return json;
    }

这是回调

public interface ServerCallback {

    ResponseEntity handleCallback();

}

和用法

reactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());

不幸的是,在客户端我没有得到 HTTP 状态 OK 但反序列化异常:

2020-04-28 16:09:35.345 ERROR 15136 --- [ctor-http-nio-2] c.a.t.t.inbound.ArpMessageServiceImpl    : Type definition error: [simple type, class org.springframework.http.ResponseEntity]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.http.ResponseEntity` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 2]
2020-04-28 16:09:35.349  WARN 15136 --- [ctor-http-nio-2] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultLastHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]

我错过了什么?

【问题讨论】:

    标签: json http mono reactor-netty


    【解决方案1】:

    所以我终于解决了这个问题。对于那些将要解决类似问题的人来说,这就是答案。 问题是 Spring Webflux 将 ResponseEntity 转换为 DefaultFullHttpResponse,因此 DefaultFullHttpResponse 包含标头、状态和正文。我通过完全相同的方法解决了这个问题。

        public void runWithPost(final String endpointPath, final ServerCallback callback) {
            if (server == null || server.isDisposed()) {
                server = HttpServer.create()
                        .host(this.host)
                        .port(this.port)
                        .route(routes ->
                                routes.post(endpointPath, (request, response) -> processResponse(response, callback)))
                        .wiretap(true)
                        .bindNow();
    
                logger.info("Starting server...");
            } else {
                logger.info("Couldn't start server because one is already running!");
            }
        }
    

    转换就在这里

        private NettyOutbound processResponse(final HttpServerResponse response, final ServerCallback callback) {
            final ResponseEntity responseEntity = callback.handleCallback();
    
            // set status
            response.status(responseEntity.getStatusCodeValue());
    
            // set headers
            final HttpHeaders entityHeaders = responseEntity.getHeaders();
    
            if (!entityHeaders.isEmpty()) {
                entityHeaders.entrySet().stream()
                        .forEach(entry -> response.addHeader(entry.getKey(), buildValue(entry.getValue())));
            }
    
            if (responseEntity.hasBody()) {
                try {
                    final Object body = responseEntity.getBody();
    
                    if (body instanceof String) {
                        return response.sendString(Mono.just((String) body));
                    } else {
                        return response.send(Mono.just(Unpooled.wrappedBuffer(getBytesFromObject(body))));
                    }
                } catch (final IOException ex) {
                    response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return response.sendString(Mono.just(ex.getMessage()));
                }
            }
    
            // set body
    
            return response.send(Mono.empty());
        }
    

    用法如下:

    mockReactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-07
      • 1970-01-01
      • 2018-07-07
      • 2013-10-09
      • 1970-01-01
      • 1970-01-01
      • 2018-12-04
      • 2017-05-29
      相关资源
      最近更新 更多