【问题标题】:Spring 5 Web Reactive Programming - WebClient ClassCastException when unmarshalling JSON from Spring Reactive Controller that streams dataSpring 5 Web Reactive Programming - 从流数据的 Spring Reactive Controller 解组 JSON 时出现 WebClient ClassCastException
【发布时间】:2016-08-05 07:50:42
【问题描述】:

这个问题与this one 有关,我在其中询问了如何从 Reactive Spring Controller 流式传输数据。

正如罗森指出的那样,我们必须使用text/event-stream 将流式传输的结果作为服务器发送的事件发回,到目前为止一切顺利。

我有这样的服务:

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream")
public Flux<Alert> getAccountAlertsStreaming() {
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
                                      new Alert((long)2, "Alert message2"),
                                      new Alert((long)3, "Alert message3")})
               .delayMillis(1000)
               .log();
}

从浏览器调用它,开始接收 3 个结果,延迟 1 秒。

我想从 WebClient 调用此服务并以这种方式实现它:

@Component
public class AccountsServiceClient {

    @Autowired
    private WebClient webClient;

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
        Flux<Alert> response = webClient
                .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
                .extract(bodyStream(Alert.class));
        return response;
    }       
}

这是测试代码:

@Test
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class})
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private AccountsServiceClient client;

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{

        CountDownLatch latch = new CountDownLatch(1);

        Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080");
        alerts.doOnComplete( () -> {
            latch.countDown();
        }).subscribe( (n) -> {
            logger.info("------------> GOT ALERT {}", n);
        });

        latch.await();
    }
}

问题在于,当客户端尝试提取结果时,HttpMessageReader's 中没有一个可以读取text/event-stream + Alert.class

public class ResponseExtractors {

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
                ResolvableType responseType, MediaType contentType) {

            return messageReaders.stream()
                    .filter(e -> e.canRead(responseType, contentType))
                    .findFirst()
                    .orElseThrow(() ->
                            new WebClientException(
                                    "Could not decode response body of type '" + contentType
                                            + "' with target type '" + respons

eType.toString() + "'"));
    }

例外:

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
    at reactor.core.Exceptions.bubble(Exceptions.java:97)
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263)
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183)
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169)
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161)
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123)
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75)
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010)
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70)
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71)
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203)
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200)
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181)
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89)
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source)
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126)
    ... 37 common frames omitted

【问题讨论】:

  • 从服务器的角度来看,WebClient 和 Web 浏览器有什么区别?
  • 我明白你的意思,但是,除了新的语义,如果我们不打算将结果流式传输,而是同时获得所有结果,那么使用新的 WebClient 有什么意义?而不是旧的RestTemplate?

标签: spring rx-java reactive-programming project-reactor


【解决方案1】:

这已经是一个问题了。请评论/投票给SPR-14539

【讨论】:

    【解决方案2】:

    也许这应该由框架自动处理。无论如何,我解决了它自己解组 JSON 流数据:

    WebConfigClient:

    @Configuration
    public class WebClientConfig {
    
        @Bean
        public ObjectMapper jacksonObjectMapper(){
            return new ObjectMapper();
        }
    
        @Bean
        public WebClient webClient(){
            WebClient webClient = new WebClient(new ReactorClientHttpConnector());
            return webClient;
        }
    
    }
    

    服务客户端:

    @Component
    public class AccountsServiceClient {
    
        @Autowired
        private WebClient webClient;
    
        @Autowired
        private ObjectMapper jacksonObjectMapper;
    
        public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
            Flux<Alert> response = webClient
                    .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
                    .extract(bodyStream(String.class))
                    .map((e -> {
                        try {
                            e = e.substring(e.indexOf(":")+1);
                            Alert a = jacksonObjectMapper.readValue(e, Alert.class);
                            return a;
                        } catch (Exception e1) {
                            e1.printStackTrace();
                            return null;
                        }
    
                    }));
            return response;
        }
    
    }
    

    更新:从 Spring 5 M4 开始,这是由框架完成的。您可以在此处使用新 API 检查解决方案:Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

    【讨论】:

      猜你喜欢
      • 2017-05-14
      • 2020-01-31
      • 2020-12-27
      • 1970-01-01
      • 2018-09-16
      • 2019-06-24
      • 2018-11-19
      • 2020-03-06
      • 2019-12-04
      相关资源
      最近更新 更多