【问题标题】:Consume json stream data in Netty/Ratpack在 Netty/Ratpack 中消费 json 流数据
【发布时间】:2017-02-01 13:21:52
【问题描述】:

我想使用 Netty 或 Ratpack 使用 json 数据流。我的用例是请求正文将包含大型 json 数据(以 MB 为单位的 json 数组)。处理数据的一种方法是阻塞,直到收到完整的数据,然后开始处理。但是,我想要异步处理的意思,只要收到一大块 json 对象就处理它。

我在 Netty 中遇到了 JsonObjectDecoder,但我没有运气使用它。 这是我的 ChannelInitializer 类:

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new JsonObjectDecoder(true));

        // HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder
        p.addLast(new HttpServerCodec());
        //
        // add gizp compressor for http response content
        p.addLast(new HttpContentCompressor());

        p.addLast(new HttpObjectAggregator(1048576));

        p.addLast(new ChunkedWriteHandler());

        p.addLast(new ServerHandler());
    }
} 

我正在发送这些数据:

[
    {
        "timestamp": "2016-11-14 11:08:09+0100", 
        "message": "message 120", 
        "hostname": "myhost.com", 
        "device_product": "product123", 
        "device_vendor": "vendor123", 
        "device_version": "1", 
        "severity": "High"
    },
    .....
    {
        "timestamp": "2016-11-14 11:08:09+0100", 
        "message": "message 121", 
        "hostname": "myhost.com", 
        "device_product": "product123", 
        "device_vendor": "vendor123", 
        "device_version": "1", 
        "severity": "High"
    }
]

但我收到此错误:

io.netty.handler.codec.CorruptedFrameException: invalid JSON received at byte position 0: 504f5354202f6c6f677320485454502f312e310d0a486f73743a206c6f63616c686f73743a383038300d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a436f6e74656e742d4c656e6774683a203230380d0a4163636570743a206170706c69636174696f6e2f6a736f6e0d0a506f73746d616e2d546f6b656e3a2062383064306264352d663234302d346563622d353631322d3863376139396434633934360d0a43616368652d436f6e74726f6c3a206e6f2d63616368650d0a4f726967696e3a206368726f6d652d657874656e73696f6e3a2f2f6668626a676269666c696e6a62646767656863646463626e636464646f6d6f700d0a557365722d4167656e743a204d6f7a696c6c612f352e30202857696e646f7773204e5420362e313b2057696e36343b2078363429204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f35352e302e323838332e3837205361666172692f3533372e33360d0a436f6e74656e742d547970653a206170706c69636174696f6e2f6a736f6e0d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2d55532c656e3b713d302e382c6a613b713d302e362c66722d46523b713d302e342c66723b713d302e322c66722d43413b713d302e320d0a0d0a7b2274696d657374616d70223a2022323031362d31312d31342031313a30383a30392b30313030222c226d657373616765223a20226d65737361676520313230222c22686f73746e616d65223a20226d79686f73742e636f6d222c200a09226465766963655f70726f64756374223a202270726f64756374313233222c200a09226465766963655f76656e646f72223a202276656e646f72313233222c200a09226465766963655f76657273696f6e223a202231222c200a09227365766572697479223a202248696768220a090a097d
    at io.netty.handler.codec.json.JsonObjectDecoder.decode(JsonObjectDecoder.java:163)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
    at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
    at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
    at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
    at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
    at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

我不知道我错过了什么。 如果有人也知道使用 Ratpack 实现此目的的方法,请帮助我。 提前致谢。

【问题讨论】:

    标签: stream netty nio nonblocking ratpack


    【解决方案1】:

    问题在于 JSON 解码器是管道中的第一个处理程序,它正在尝试解码 HTTP 帖子。如果我从您发布的错误消息中获取无效数据流,请将其解析回字节并从中创建一个字符串(在 groovy 中)...

    import javax.xml.bind.DatatypeConverter;
    v = "504f5354202f6c6f677320485...<snip>";
    byte[] bytes = DatatypeConverter.parseHexBinary(v);
    println new String(bytes)
    

    结果是:

    POST /logs HTTP/1.1 主机:localhost:8080 连接:keep-alive 内容长度:208 接受:应用程序/json 邮递员令牌: 缓存控制:无缓存来源: chrome-extension:// 用户代理: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, 像壁虎)Chrome/55.0.2883.87 Safari/537.36 内容类型: application/json Accept-Encoding: gzip, deflate, br Accept-Language: en-US,en;q=0.8,ja;q=0.6,fr-FR;q=0.4,fr;q=0.2,fr-CA;q=0.2

    {“时间戳”:“2016-11-14 11:08:09+0100”,“消息”:“消息 120”,“主机名”:“myhost.com”,“device_product”:“product123”, “device_vendor”:“vendor123”,“device_version”:“1”,“严重性”: “高”}

    所以你需要在 JSON 解码器之前将这些添加到管道中:

    1. HttpServerCodec
    2. HttpObjectAggregator(对于大型帖子,数据可以分块)
    3. MessageToMessageDecodee 接受 [Full]HttpRequest 并转发内容(作为 ByteBuf)。

    然后 JSON 解码器将获取一大块 JSON 字节并开始向上游发送解析出来的消息。

    【讨论】:

    • 感谢您的回复。但它不会满足我的异步要求。我想在流中处理数据,但是如果我在此之前添加 HttpServerCodec、HttpObjectDecoder 等,它们将被阻止构建 FullHttpRequest。所以,它不会是实际的流处理。
    • 不发布 HTTP。只需将 JSON 作为字节流直接处理即可。 HTTP 不能很好地流式传输。
    • 我该怎么做...我正在构建涉及非常大的数据发布的rest API...
    【解决方案2】:

    要在 HTTP POST 中执行此操作,您需要确保请求被分块。 这是您需要对管道执行的操作的近似值:

    1. HttpServerCodec - 将转发HttpContent 的实例,但第一个实例除外,它将是HttpRequest
    2. MessageToMessageDecoder 接受 HttpContent 实例,提取内容 ByteBuf 并转发。
    3. JSON 解码器。
    4. 您的 JSON 处理程序。

    在某些时候,您会得到一个 HttpContent,它也是 LastHttpContent 的一个实例,它将是最后一个块。

    棘手的部分是,在某些时候,其中一个 HttpContents 将具有不完整的 JSON 序列,这将触发 JSON 解码器中的错误,此时您需要将 ByteBuf 倒回到最后一个已知的“好”位置并等待下一个块出现并完成它,因为我不认为这是自动处理的。

    【讨论】:

    • 感谢您的帮助。我会按照你的建议尝试。我认为它应该有效。
    猜你喜欢
    • 2016-06-11
    • 1970-01-01
    • 2019-04-19
    • 2018-07-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-09-20
    相关资源
    最近更新 更多