【问题标题】:Accessing the underlying array of a Memory<T>访问 Memory<T> 的底层数组
【发布时间】:2021-07-31 00:56:36
【问题描述】:

在我的应用程序中,我需要遍历文件的内容以生成文件的固定大小块的哈希值。最终目标是实现 Amazon Glacier 的 Tree Hash 算法,我几乎一字不差地从他们的文档中复制了代码。

当我通过 SonarQube 运行以下代码时会出现问题:

    byte[] buff = new byte[Mio];
    int bytesRead;

    while ((bytesRead = await inputStream.ReadAsync(buff, 0, Mio)) > 0) {
        // Process the bytes read
    }

我遇到了 while 循环的 Roslyn 问题。问题是“更改 'ReadAsync' 方法调用以使用 'Stream.ReadAsync(Memory, CancellationToken)' 重载”。根据描述,这是因为使用 Memory 类的方法比使用基本数组的方法效率更高。

当类可以从头到尾使用时,这可能是正确的。问题是,我需要将数据提供给HashAlgorithmComputeHash 方法,并且它们没有任何接受Memory 的覆盖。这意味着我必须使用MemoryToArray 方法,它会生成数据的副本。这对我来说听起来不是很有效。

我知道可以通过将现有数组传递给其构造函数来创建 Memory 实例,如下所示:

    byte[] buff = new byte[Mio];
    Memory<byte> memory = new Memory<byte>(buff);
    int bytesRead;

    while ((bytesRead = await inputStream.ReadAsync(memory)) > 0) {
        // Use `buff` to access the bytes
    }

但是文档不清楚传递给构造函数的数组是否实际用作Memory 实例的底层存储。

因此,这是我的问题:

  • 如何将Memory 中的数据直接提供给HashAlgorithm 实例?我说的是派生自HashAlgorithm 的类的任何 实例,而不是具体的SHA256 算法。与 Glacier 不同,我的实现不限于 SHA256。
  • 存储在Memory 实例中的数据是否也可以在用于创建它的数组中访问?
  • 是否有另一种方法可以访问存储在Memory 实例中的数据作为数组,无需复制
  • 如果做不到这一点,我如何才能使 SonarQube 中的外部问题静音(本例中为 Roslyn 警告)?我没有像普通声纳问题那样更改其状态的下拉菜单。

编辑添加有关代码工作原理的其他信息: 它是AWS's example of computing a Glacier Tree Hash 的第一部分,该部分计算文件中 1Mio 块的第一个哈希值。

这些是上面while循环的内容:


// Constructor of the class
// The class implements IDisposable to properly dispose of the Algorithm field
// Constructor is called like this
// `using TreeHash treeHash = new TreeHash(System.Security.Cryptography.SHA512.Create());`
public TreeHash(HashAlgorithm algo) {
    this.Algorithm = algo;
}


// Chunk hash generation loop
// first part of the tree hash algorithm
    byte[][] chunkHashes = new byte[numChunks][];

    byte[] buff = new byte[Mio];
    int bytesRead;
    int idx = 0;

    while ((bytesRead = await inputStream.ReadAsync(buff, 0, Mio)) > 0) {
        chunkHashes[idx++] = this.ComputeHash(buff, bytesRead);
    }


// Quick wrapper around the hash algorithm
// Also used by the second part of the tree hash computation
private byte[] ComputeHash(byte[] data, int count) => this.Algorithm.ComputeHash(data, 0, count);

我默认使用无前缀版本的哈希算法,但我可能会切换到托管版本。如果需要,该方法可以变为非async

【问题讨论】:

  • TryComputeHash/TryHashData 都接受ReadOnlySpan&lt;byte&gt; 您可以将Memory&lt;byte&gt;Span 属性传递给它(带有任何需要的Slice'ing),它不会使任何副本,而是对原始数组(或子部分)进行操作。为了使用 Try 模式,您需要进行一些重组。真正对您的问题有帮助的是您如何使用HashAlgorithm 的示例,尤其是关于当前缓冲区/它如何适合您现有代码的示例。这样我们就可以提供一个实际的答案
  • 另外澄清一下,TryHashData 是使用非托管实现的各种HashAlgorithm 类的static 方法。即Managed 结尾的类。你没有提到你正在使用哪个,但如果你还没有的话,我建议你使用xxxManaged 版本。实例方法 TryComputeHash 可用于托管和非托管变体。
  • 您已更新您的问题。但是您仍然没有提供足够的信息。请显示Algorithm 字段/属性的声明,以便我们可以看到它的声明/静态类型。您如何初始化它的示例也可能会有所帮助

标签: c# arrays sonarqube buffer filestream


【解决方案1】:

以下应该有效。它利用MemoryPool&lt;byte&gt; 来获取IMemoryOwner&lt;byte&gt;,我们可以使用它来检索我们的暂存缓冲区。我们需要一个Memory&lt;byte&gt; 来传递给ReadAsync 调用,所以我们传递IMemoryOwner&lt;byte&gt;Memory 属性。

然后我们重构代码以使用HashAlgorithm.TryComputeHash 方法,该方法接受ReadOnlySpan&lt;byte&gt; 作为源,Span&lt;byte&gt; 作为目标。我们确实分配了一个新数组(而不是使用ArrayPool),因为您正在保存/存储数组。

byte[][] chunkHashes = new byte[numChunks][]; 

using var memory = MemoryPool<byte>.Shared.Rent(Mio);

int bytesRead;
int idx = 0; 

while ((bytesRead = await inputStream.ReadAsync(memory.Memory, CancellationToken.None)) > 0) 
{ 
   var tempBuff = new byte[(int)Math.Ceiling(this.Algorithm.HashSize/8.0)];
   if (this.Algorithm.TryComputeHash(memory.Memory.Span[..bytesRead] /*1*/, tempBuff, out var hashWritten)) 
   {
      chunkHashes[idx++] = hashWritten == tempBuff.Length ? tempBuff : tempBuff[..hashWritten] /*2*/;
   } 
   else
      throw new Exception("buffer not big enough");
}

对于源,我们传递Memory&lt;bytes&gt; 缓冲区的Span 属性,该缓冲区再次从IMemoryOwner&lt;byte&gt;.Memory 属性中检索。我们根据读取的字节数将其切成适当的长度。我们作为目标传递的Span&lt;byte&gt; 必须至少是算法的HashSize 属性的大小,即比特数不是 bytes) 是散列所需的。由于实现有可能(尽管我认为不太可能)使用 不是 8 的倍数的大小,因此我们上限在必要时将除法四舍五入。我们不需要调用AsSpan,因为有来自T[] 的隐式转换。

我相信*最终写入的字节数将始终与HashSize 的长度相同。如果/当它是,我们只需使用原始数组。否则,我们需要根据写入的哈希字节数将其切片为正确的长度。

如果缓冲区不够大,TryComputeHash 返回false,我们抛出异常。我相当肯定这不会发生在我们身上,因为我们是根据HashSize 明确计算大小的,但我们还是将这种情况作为最佳实践来处理。

我已通过CancellationToken.None,但您可以提供自己的令牌。我还使用Range 语法而不是显式调用Slice。如果您无法使用,或者您只是不喜欢它的外观,您可以明确说明:

/*1*/ memory.Memory.Span.Slice(0, bytesRead)
/*2*/ tempBuff.AsSpan(0, hashWritten).ToArray()

我们可以做出一些可能的假设:

  • 假设 HashSize 总是 8 的倍数
  • 假设HashSize 始终等于写入的字节数,并且不对最终数组进行切片
  • 假设我们总是提供足够大的缓冲区(根据上面的说法,这将是所需的确切大小)并删除 ifException
while ((bytesRead = await inputStream.ReadAsync(memory.Memory, CancellationToken.None)) > 0)
{
   var tempBuff = new byte[this.Algorithm.HashSize/8];
   _ = this.Algorithm.TryComputeHash(memory.Memory.Span[..bytesRead], tempBuff, out _);
   chunkHashes[idx++] = tempBuff;
} 

* 不幸的是,我不能 100% 肯定地说这些是可以做出的有效假设。我查看的大多数实现都有Debug.Assert 的源代码来验证缓冲区大小和写入的字节是否相同,所以我认为它们是合理的。也就是说,我认为我个人会坚持使用更详细的选项。

您还会注意到我已经删除了您的 ComputeHash 函数。这并不是说您仍然不能使用它,但我将其转换为基于 TryMemory&lt;&gt; 模式作为练习留给读者。

【讨论】:

  • 我已经修复了语法错误并使代码 async 友好(不能有 Span/ref structs 的局部变量)
  • 最后我使用了您提供的代码 sn-ps 的变体,TryCompputeHashmemory.Span。它按预期工作。非常感谢!
  • @Nathan.EilishaShirani 没问题。我会确保你使用 MemoryPool 而不是自己创建一个新的 Memory 对象......特别是因为你似乎关心额外的、不必要的分配。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多