【问题标题】:Error while connecting to Spring Boot RSocket server from RSocket-Java Client从 RSocket-Java 客户端连接到 Spring Boot RSocket 服务器时出错
【发布时间】:2020-04-11 14:14:33
【问题描述】:

我在通过 TCP 连接到 Spring Boot RSocket 应用程序时遇到问题。使用 RSocketRequester 时客户端工作正常,但是当我尝试使用 RSocketFactory 客户端连接时,它不断出现错误。代码如下。

        RSocket rSocket = this.client = RSocketFactory
            .connect()
            .mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .block();


Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
    s.subscribe();

这会产生如下错误:

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)

据我了解,这个特殊错误是由于 netty 消息包装(来自 stackoverflow 上的其他线程),但如何解决呢? 服务器是 Spring Boot 5+ RSocket,但客户端只使用 RSocket-Java。

【问题讨论】:

    标签: spring-boot spring-messaging rsocket rsocket-java


    【解决方案1】:

    问题在于 MIME 类型。 在您的情况下,服务器等待CBOR,但您继续进行application/json

    代码解决方案:改变RSocketRequester的初始化方式,如下例所示,你的客户端发送CBOR,你可以通过启用调试看到:logging.level.io.rsocket.FrameLogger: DEBUG。这就是hello world的全部内容,无需在客户端自定义策略或工厂实现

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies strategies) {
        return RSocketRequester
                .builder()
                .rsocketStrategies(strategies)
                .connectTcp("127.0.0.1", 7000)
                .retry(5)
                .block();
        }
    

    顺便说一句,即使在两侧都使用自定义 EncoderDecoder ,我也没有在两侧使用 JSON 达到解决方案。我猜这里的原因是没有CBOR to Jackson 转换器,反之亦然:org.springframework.http.codec.cbor.Jackson2CborEncoder

    【讨论】:

    • 感谢您的回复。我没有在客户端使用 RSocketRequester。问题是当我放置元数据时(对于路由,这里的“套接字”是服务器上的路由@MessageMapping)。如果没有元数据,我可以很好地在服务器上获得有效负载。我试图弄清楚 Spring-Rsocket 如何进行路由映射并将其发送到元数据中。
    • 据我了解:客户端在元数据route: some 发送,rsocket 服务器使用PathPatternRouteMatcher 解析它
    • 我想这就是我想要理解的。在路由中放置什么(编码明智),以便默认 PathPatternRouteMater 可以路由到正确的端点(服务器使用来自 Spring 的 @MessageMapping)。如果那很复杂,如何放置既对 Spring 友好又与 rSocket 一起使用的自定义 RouteMatcher。我已经更新了我的代码以使其易于理解。
    【解决方案2】:

    来自this,使用以下内容生成元数据。

    CompositeByteBuf metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
    RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of("/route"));
    CompositeMetadataCodec.encodeAndAddMetadata(metadata,
            ByteBufAllocator.DEFAULT,
            WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
            routingMetadata.getContent());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-09-19
      • 2022-10-24
      • 1970-01-01
      • 2016-06-08
      • 2021-05-15
      • 1970-01-01
      • 2022-11-05
      • 1970-01-01
      相关资源
      最近更新 更多