【问题标题】:What is the correct way to use async/await in a recursive method?在递归方法中使用 async/await 的正确方法是什么?
【发布时间】:2014-09-20 09:02:04
【问题描述】:

在递归方法中使用 async/await 的正确方法是什么?这是我的方法:

public string ProcessStream(string streamPosition)
{
    var stream = GetStream(streamPosition);

    if (stream.Items.count == 0)
        return stream.NextPosition;

    foreach(var item in stream.Items) {
        ProcessItem(item);
    }

    return ProcessStream(stream.NextPosition)
}

这里是异步/等待的方法:

public async Task<string> ProcessStream(stringstreamPosition)
{
        var stream = GetStream(streamPosition);

        if (stream.Items.count == 0)
            return stream.NextPosition;

        foreach(var item in stream.Items) {
            await ProcessItem(item); //ProcessItem() is now an async method
        }

        return await ProcessStream(stream.NextPosition);
 }

【问题讨论】:

  • 您的尝试是否有任何错误?
  • @StephenCleary 不,似乎工作正常。我只是想知道是否有任何潜在的危险。
  • @zerkms 我的代码如何同步?使用 await 关键字,该方法应该立即返回给调用者,对吗?我今天才开始学习 async/await,所以我的理解可能存在很多差距。
  • @Prabhu:是的,这是我的错误
  • 对我来说看起来很正确。不知道你在这里期待什么。

标签: c# .net asynchronous recursion async-await


【解决方案1】:

虽然我必须提前说,该方法的意图对我来说并不完全清楚,但用一个简单的循环重新实现它是非常简单的:

public async Task<string> ProcessStream(string streamPosition)
{
    while (true)
    {
        var stream = GetStream(streamPosition);

        if (stream.Items.Count == 0)
            return stream.NextPosition;

        foreach (var item in stream.Items)
        {
            await ProcessItem(item); //ProcessItem() is now an async method
        }

        streamPosition = stream.NextPosition;
    }
}

递归不是堆栈友好的,如果您可以选择使用循环,那么在简单的同步场景(其中控制不佳的递归最终导致StackOverflowExceptions)以及异步场景中,它绝对值得研究,老实说,我什至不知道如果你把事情推得太远会发生什么(每当我尝试使用async 方法重现已知的堆栈溢出场景时,我的 VS 测试资源管理器就会崩溃)。

Recursion and the await / async Keywords 等答案表明 StackOverflowExceptionasync 的问题不大,因为 async/await 状态机的工作方式,但这不是我探索过的东西,因为我倾向于避免递归尽可能。

【讨论】:

  • @Krill 非常感谢。另一个问题是 foreach 将一个接一个地处理,因为它正在等待 ProcessItem。反正有没有加快它,所以它不等待 ProcessItem。我有点需要只是触发并忘记,但如果我取出等待,编译器会警告我在异步方法中不使用等待不是一个好主意。
  • @Prabhu,这是高度特定于场景的,取决于多个因素,即您的代码的线程安全程度,以及您希望 stream.Items 包含多少项。线程安全至关重要,您需要确保多个ProcessItems 可以并行运行。除此之外 - 如果项目的数量并不重要,但每个项目都需要很长时间来处理,我会将 foreach 替换为 await Task.WhenAll(stream.Items.Select(ProcessItem));。如果项目数量很大且处理速度很快,我将恢复为同步处理并改用Parallel.ForEach
  • 在我的情况下,项目的数量非常大。很想知道为什么parallel.ForEach(这是我现在拥有的)在这种情况下会表现得更好。
  • @Prabhu,那是因为Tasks 非常“重”,在处理 large 集合时,您真的不希望每个项目都有一个。 Parallel.ForEach 使用智能集合分区来实现并行化而没有太多开销(不包括您处理真正小型委托主体的情况),并提供了许多可选的调整和优化(即本地状态),您应该对其进行研究,以充分发挥其性能。
  • 谢谢。我可以生火然后忘记 ProcessItem(item) 吗?
【解决方案2】:

当我添加代码以使您的示例更具体时,我发现递归结果很糟糕的两种可能方式。他们都假设你的数据非常大,需要特定的条件才能触发。

  1. 如果ProcessItem(string) 返回一个在awaited 之前完成的Task(或者,我假设它在await 完成旋转之前完成),则继续将同步执行。在下面的代码中,我通过让ProcessItem(string) 返回Task.CompletedTask 来模拟这一点。当我这样做时,程序 very 很快以StackOverflowException 终止。这是因为 .net 的 TPL “ReleasesZalgo” by opportunistically executing continuations synchronously 不考虑当前堆栈中有多少可用空间。这意味着它会加剧使用递归算法已经存在的潜在堆栈空间问题。要查看这一点,请在下面的代码示例中注释掉 await Task.Yield();
  2. 如果您使用某种技术来阻止 TPL 同步继续(下面我使用Task.Yield()),最终程序将耗尽内存并以OutOfMemoryException 终止。如果我理解正确,如果return await 能够模拟尾调用优化,就不会发生这种情况。我想这里发生的事情是每个调用都会生成类似于簿记Task&lt;string&gt; 的东西,并且即使它们可以合并也会继续生成它们。要使用下面的示例重现此错误,请确保您以 32 位运行程序,禁用 Console.WriteLine() 调用(因为控制台真的很慢),并确保取消注释 await Task.Yield()
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

// Be sure to run this 32-bit to avoid making your system unstable.
class StreamProcessor
{
    Stream GetStream(string streamPosition)
    {
        var parsedStreamPosition = Convert.ToInt32(streamPosition);
        return new Stream(
            // Terminate after we reach 0.
            parsedStreamPosition > 0 ? new[] { streamPosition, } : new string[] { },
            Convert.ToString(parsedStreamPosition - 1));
    }

    Task ProcessItem(string item)
    {
        // Comment out this next line to make things go faster.
        Console.WriteLine(item);
        // Simulate the Task represented by ProcessItem finishing in
        // time to make the await continue synchronously.
        return Task.CompletedTask;
    }

    public async Task<string> ProcessStream(string streamPosition)
    {
        var stream = GetStream(streamPosition);

        if (stream.Items.Count == 0)
            return stream.NextPosition;

        foreach (var item in stream.Items)
        {
            await ProcessItem(item); //ProcessItem() is now an async method
        }

        // Without this yield (which prevents inline synchronous
        // continuations which quickly eat up the stack),
        // you get a StackOverflowException fairly quickly.
        // With it, you get an OutOfMemoryException eventually—I bet
        // that “return await” isn’t able to tail-call properly at the Task
        // level or that TPL is incapable of collapsing a chain of Tasks
        // which are all set to resolve to the value that other tasks
        // resolve to?
        await Task.Yield();

        return await ProcessStream(stream.NextPosition);
    }
}

class Program
{
    static int Main(string[] args) => new Program().Run(args).Result;
    async Task<int> Run(string[] args)
    {
        await new StreamProcessor().ProcessStream(
            Convert.ToString(int.MaxValue));
        return 0;
    }
}

class Stream
{
    public IList<string> Items { get; }
    public string NextPosition { get; }
    public Stream(
        IList<string> items,
        string nextPosition)
    {
        Items = items;
        NextPosition = nextPosition;
    }
}

所以,我想我的两个建议是:

  1. 如果您不确定递归的堆栈增长是否会被其他事物中断,请使用 Task.Yield()
  2. As suggested already,如果一开始对您的问题没有意义,请避免递归。即使它是一个干净的算法,如果你的问题规模是无限的,也要避免它。

【讨论】:

    猜你喜欢
    • 2017-12-28
    • 2018-12-16
    • 2019-04-21
    • 2017-05-13
    • 2021-07-06
    • 2018-06-10
    • 1970-01-01
    • 2014-03-16
    • 1970-01-01
    相关资源
    最近更新 更多