【问题标题】:Facing io.netty.handler.timeout.ReadTimeoutException: null while consuming server sent events面对 io.netty.handler.timeout.ReadTimeoutException: null while using server sent events
【发布时间】:2021-08-19 20:49:23
【问题描述】:

我是 Spring Web Flux 的新手,我有一个使用服务器发送事件的客户端应用程序,事件由服务器随机发布,没有固定延迟。但是如果没有事件,消费者会在 60 秒后抛出io.netty.handler.timeout.ReadTimeoutException: null

服务器端事件消费者代码

webClient.get()
        .uri("http://localhost:8080/events")
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(type)
        .subscribe(event -> process(event));

即使长时间没有事件,我也需要连接客户端。

完全例外

[36mr.netty.http.client.HttpClientConnect   [...] The connection observed an error

io.netty.handler.timeout.ReadTimeoutException: null

reactor.Flux.MonoFlatMapMany.1    onError(org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException)
reactor.Flux.MonoFlatMapMany.1     

org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.5.jar:5.3.5]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):

【问题讨论】:

    标签: spring spring-boot spring-webflux project-reactor reactor-netty


    【解决方案1】:

    Mozilla description for server sent events中有一条说明:

    冒号作为一行的第一个字符本质上是一个注释,并且 被忽略。 注意:注释行可用于阻止连接 从超时;服务器可以定期发送评论以保持 连接有效。

    因此定期发送 cmets 可以保持连接处于活动状态。那么我们如何发送评论呢?

    spring 的类ServerSentEvent 具有函数ServerSentEvent#comment。因此,如果我们将此类与 Flux#interval 结合使用,我们可以合并仅包含 cmets keep alive 的事件。

    这是我不久前建立的一个项目的示例

    @Bean
    public RouterFunction<ServerResponse> foobars() {
        return route()
                .path("/api", builder -> builder
                    .GET("/foobar/{id}", accept(TEXT_EVENT_STREAM), request -> ok()
                            .contentType(MediaType.TEXT_EVENT_STREAM)
                            .header("Cache-Control", "no-transform")
                            .body(Flux.merge(foobarHandler.stream(request.pathVariable("id")),
                                    Flux.interval(Duration.ofSeconds(15)).map(aLong -> ServerSentEvent.<List<FoobarResponse>>builder()
                                            .comment("keep alive").build())), new ParameterizedTypeReference<ServerSentEvent<List<FoobarResponse>>>(){}))
                .build();
    }
    

    【讨论】:

      猜你喜欢
      • 2020-12-15
      • 2017-03-02
      • 1970-01-01
      • 2013-08-06
      • 2012-08-18
      • 2015-08-05
      • 1970-01-01
      • 2020-05-26
      相关资源
      最近更新 更多