【问题标题】:Pipelines buffer preserving until processing is complete管道缓冲区保留直到处理完成
【发布时间】:2020-12-30 01:16:38
【问题描述】:

我正在研究使用管道处理来自网络的二进制消息的可能性。 我将要处理的二进制消息带有一个有效负载,最好将有效负载保持为二进制形式。

这个想法是读出整个消息并创建一个消息片段及其有效负载,一旦消息被完全读取,它将被传递到通道链进行处理,处理不是即时的,可能需要一些时间或者稍后执行,目标是不让管道读取器等待直到处理完成,然后一旦消息处理完成,我需要将处理后的缓冲区释放给管道写入器。

现在我当然可以创建一个新的字节数组并复制来自管道写入器的数据,但这会超出不复制的目的吗?所以据我所知,我需要管道和通道之间的一些缓冲区同步? 我观察了管道读取器的可用 api (AdvanceTo),它可以告诉管道读取器消耗了什么以及检查了什么,但无法解决如何在管道读取方法之外同步它。

所以问题是是否有一些技术或示例说明如何实现这一点。

【问题讨论】:

    标签: c# system.threading.channels system.io.pipelines


    【解决方案1】:

    TryRead/ReadAsync 获得的缓冲区仅在您调用AdvanceTo 之前有效,期望一旦您这样做了:您报告为消耗的任何内容都是可回收在其他地方使用(可以是并行/并发阅读器)。严格来说:即使是您没有报告为已消耗的位:一旦您调用了AdvanceTo,您仍然不应将其视为有效(尽管实际上,它们很可能仍会是相同的段 - 只是:这不是调用者关心的问题;对调用者来说,它只在读取和前进之间有效)。

    这意味着你明确不能这样做:

    while (...)
    {
        var result = await pipe.ReadAsync();
        if (TryIdentifyFrameBoundary(out var frame)) {
            BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
            reader.AdvanceTo(frame.End, frame.End);
        }
        else if { // take nothing
            reader.AdvanceTo(buffer.Start, buffer.End);
            if (result.IsCompleted) break; // that's all folks
        }
    }
    

    因为“在后台”位,当它触发时,现在可能正在读取其他人的数据(由于它已经被重复使用)。

    所以:要么您需要将帧内容作为读取循环的一部分进行处理,或者您将不得不复制数据,大多数情况下可能通过使用:

    c#
    var len = checked ((int)buffer.Length);
    var oversized = ArrayPool<byte>.Shared.Rent(len);
    buffer.CopyTo(oversized);
    

    并将oversized 传递给您的后台处理,记住只查看它的第一个len 字节。您可以将其作为ReadOnlyMemory&lt;byte&gt; 传递,但您需要考虑之后您还希望将其返回到数组池(可能在finally 块中),并且将其作为记忆传递会使其更加尴尬(但并非不可能,感谢MemoryMarshal.TryGetArray)。


    注意:在管道 API 的早期版本中,有一个引用计数元素,确实允许您保留缓冲区,但它有一些问题:

    • 这让 API 变得非常复杂
    • 导致缓冲区泄漏
    • “保留”的含义模棱两可且令人困惑; 重用之前的计数是多少?还是完全发布

    所以该功能被删除了。

    【讨论】:

    • 知道了,如果没有找到解决方案,我会这样做。一个问题,因为我将有一些函数会在调用时产生未知/可变大小的缓冲区,所以最好的做法是在函数参数中传递共享 MemoryPool 以便函数可以分配所需的缓冲区?函数调用者将负责在处理完缓冲区后释放缓冲区。这是一种常见的方法吗?
    • @NullReference 老实说,除非有充分的理由,否则我会保持简单并使用共享缓冲池 (ArrayPool&lt;T&gt;.Shared)
    • 嗯,是的,它可能是一个共享的,我会检查我是否有任何充分的理由:) 据我了解,ArrayPool 和 MemoryPool 之间的选择将取决于我将使用的底层 api解析/处理数据?
    • @NullReference 在我看来不是真的;您可以从T[] 轻松创建[ReadOnly]Memory&lt;T&gt;,同时修复“超大”的东西,而MemoryPool&lt;T&gt; 具有分配IDisposable 的开销。坦率地说,在大多数代码中,几乎没有理由使用MemoryPoolT&lt;T&gt; 而不仅仅是ArrayPool&lt;T&gt;。当它确实变得有趣时:当你想为你的池使用非托管内存时。
    猜你喜欢
    • 2012-07-11
    • 1970-01-01
    • 1970-01-01
    • 2013-01-16
    • 2014-08-29
    • 1970-01-01
    • 2012-01-25
    • 1970-01-01
    相关资源
    最近更新 更多