【问题标题】:Chaining arbitrary number of tasks together in C#.NET在 C#.NET 中将任意数量的任务链接在一起
【发布时间】:2020-02-08 09:52:48
【问题描述】:

我有什么

我有一套异步处理方法,类似于:

public class AsyncProcessor<T>
{
    //...rest of members, etc.

    public Task Process(T input)
    {
        //Some special processing, most likely inside a Task, so
        //maybe spawn a new Task, etc.
        Task task = Task.Run(/* maybe private method that does the processing*/);
        return task;
    }
}

我想要什么

我想将它们全部链接在一起,按顺序执行

我尝试了什么

我已尝试执行以下操作:

public class CompositeAsyncProcessor<T>
{
    private readonly IEnumerable<AsyncProcessor<T>> m_processors;

    //Constructor receives the IEnumerable<AsyncProcessor<T>> and
    //stores it in the field above.

    public Task ProcessInput(T input)
    {
        Task chainedTask = Task.CompletedTask;

        foreach (AsyncProcessor<T> processor in m_processors)
        {
            chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
        }

        return chainedTask;
    }
}

出了什么问题

但是,任务不会按顺序运行,因为据我了解,在对ContinueWith 的调用中,processor.Process(input) 调用会立即执行,并且该方法会独立于状态返回返回的任务。因此,所有处理任务仍然几乎同时开始。

我的问题

我的问题是我是否可以做一些优雅的事情来按顺序链接任务(即没有执行重叠)。例如,我可以使用以下语句来实现这一点吗(我在细节上有点挣扎)?

chainedTask = chainedTask.ContinueWith(async t =&gt; await processor.Process(input));

另外,我如何不使用 async/await,只使用 ContinueWith 来做到这一点?

我为什么要这样做?

因为我的 Processor 对象可以访问“thread-unsafe”资源并从其请求内容。 另外,我不能只等待所有方法,因为我不知道它们有多少,所以我不能只写下必要的代码行。

线程不安全是什么意思?一个具体的问题

因为我可能用错了这个词,所以用插图来解释这一点会更好一些。在我的Processor 对象使用的“资源”中,它们都可以访问如下对象:

public interface IRepository
{
    void Add(object obj);

    bool Remove(object obj);

    IEnumerable<object> Items { get; }
}

目前使用的实现比较幼稚。所以一些Processor 对象添加了东西,而另一些则检索Items 进行检查。当然,我经常遇到的一个例外是:

InvalidOperationException: 集合被修改,枚举 操作可能无法执行。

我可以花一些时间锁定访问并预运行枚举。然而,这是我要考虑的第二个选项,而我的第一个想法是让进程按顺序运行。

为什么我必须使用任务?

虽然我在 this 情况下拥有完全控制权,但我可以说,出于问题的目的,我可能无法更改基本实现,所以如果我被卡住会发生什么任务? 此外,这些操作实际上确实代表了相对耗时的 CPU 密集型操作,而且我正在尝试实现响应式用户界面,因此我需要减轻异步操作的一些负担。虽然很有用,并且在我的大多数用例中,没有必要链接多个,而是每次链接一个(或几个,但总是特定的和特定的计数,所以我能够挂钩它们一起没有迭代和异步/等待),其中一个用例最终需要将未知数量的任务链接在一起。

我目前如何处理这个问题

我目前处理这个问题的方式是在ContinueWith 调用中附加对Wait() 的调用,即:

foreach (AsyncProcessor<T> processor in m_processors)
{
    chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}

我会很感激任何关于我应该如何做到这一点的想法,或者我可以如何更优雅地做到这一点(或者,可以说是“异步正确”)。另外,我想知道如何不使用 async/await 来做到这一点。

为什么我的问题与 this question 不同,后者并没有完全回答我的问题。

因为链接的问题有 两个 任务,所以解决方案是简单地编写所需的 两个 行,而我有任意(且未知)数量的任务,所以我需要一个合适的迭代。另外,我的方法是不是异步的。我现在明白(从已删除的单个简要可用答案中)如果我将方法更改为 asyncawait 每个处理器的 Task 方法,我可以相当容易地做到这一点,但是我仍然想知道如何在没有 async/await 语法的情况下实现这一点。

为什么我的问题与其他链接的问题不重复

因为他们都没有解释如何正确使用 ContinueWith 进行链接,而我对使用 ContinueWith 并且不使用 async/await 模式的解决方案感兴趣。我知道这种模式可能是更可取的解决方案,我想了解如何(如果可能)正确使用 ContinueWith 调用进行任意链接。我现在知道我don't need ContinueWith。问题是,how do I do it with ContinueWith?

【问题讨论】:

  • 作为程序员的一部分是学习识别相同问题的抽象形式。在您的情况下,标记的重复项确实回答了您的问题,因为它们解释了 await 和其他技术的用法,以确保给定的 Task 在继续下一步处理之前已完成。请注意,一般情况下,当然在您的场景中,使用ContinueWith() 是多余的。你没有完成任何调用await 的工作。所以在你的循环中,只需await 任务,然后调用Process()
  • ContinueWith predates async/await 构造,因为它已在 C# 4.0 和 .NET Framework 4.0 (TPL) 中可用,而 async/await 是在 C# 5.0 中引入的。当 await 是一件事(即语言关键字)时,我已经了解任务实际上是如何“等待”的。我还想知道没有异步/等待是如何做到的。那时开发人员更喜欢是如何做到的?
  • 嗯,是的,我不知道我现在(认为)我知道什么:创建一个任务返回方法 asyncconsumers没有实际影响>...它只是帮助一个人使用await,这让一个人的生活更轻松。所以我不准备改变使用异步的方法,因此,我正在寻找替代方案。由于使方法异步对“外部世界”没有任何影响,因此我现在可能会使用它。
  • 顺便说一句,如果我站在你的立场上,我不会花太多时间优化 CompositeAsyncProcessor 类,因为已经有可用的库可以做它所做的事情,还有更多,而且性能很高,经过广泛测试和积极维护。我想到了TPL Dataflow,尤其是ActionBlock 类。学习这个图书馆可能是您更好的时间投资。
  • 这很有趣!感谢您提供的信息,我会研究一下。我几乎肯定会使用ContinueWith 方法并转向async/await。但是,这个问题和答案(包括你的)帮助我澄清了一些我不完全确定的事情,因此尽管手头的问题“不适用”,但它还是很有帮助的。

标签: c# asynchronous task task-parallel-library method-chaining


【解决方案1】:

foreach + await 将依次运行Processes。

    public async Task ProcessInputAsync(T input)
    {
        foreach (var processor in m_processors)
        {
            await processor.Process(input));
        }
    }

顺便说一句。 Process,应该叫ProcessAsync

【讨论】:

    【解决方案2】:

    Task.ContinueWith 方法不理解异步委托,就像 Task.Run 一样,因此当您返回 Task 时,它会将其视为正常返回值并将其包装在另一个 Task 中。所以你最终会收到Task&lt;Task&gt;,而不是你期望得到的。如果AsyncProcessor.Process 返回一个通用的Task&lt;T&gt;,问题就很明显了。在这种情况下,由于从Task&lt;Task&lt;T&gt;&gt;Task&lt;T&gt; 的非法转换,您会收到编译错误。在您的情况下,您从Task&lt;Task&gt; 转换为Task,这是合法的,因为Task&lt;TResult&gt; 派生自Task

    解决问题很容易。您只需将Task&lt;Task&gt; 解包为一个简单的Task,并且有一个内置方法Unwrap 可以做到这一点。

    不过还有一个问题需要解决。目前,您的代码抑制了每个人 AsyncProcessor.Process 上可能发生的所有异常,我认为这不是有意的。因此,您必须决定在这种情况下采用哪种策略。您是要立即传播第一个异常,还是更喜欢将它们全部缓存并在最后传播它们并捆绑在 AggregateException 中,就像 Task.WhenAll 一样?下面的例子实现了第一个策略。

    public class CompositeAsyncProcessor<T>
    {
        //...
        public Task Process(T input)
        {
            Task current = Task.CompletedTask;
            foreach (AsyncProcessor<T> processor in m_processors)
            {
                current = current.ContinueWith(antecessor =>
                {
                    if (antecessor.IsFaulted)
                        return Task.FromException<T>(antecessor.Exception.InnerException);
                    return processor.Process(input);
                },
                    CancellationToken.None,
                    TaskContinuationOptions.ExecuteSynchronously,
                    TaskScheduler.Default
                ).Unwrap();
            }
            return current;
        }
    }
    

    我使用了ContinueWith 的重载,它允许配置所有选项,因为默认值并不理想。默认TaskContinuationOptionsNone。将其配置为 ExecuteSynchronously 可以最大限度地减少线程切换,因为每个延续都将在完成前一个的同一线程中运行。

    默认任务调度程序是TaskScheduler.Current。通过指定TaskScheduler.Default,您可以明确表示您希望继续在线程池线程中运行(对于某些无法同步运行的特殊情况)。 TaskScheduler.Current 是特定于上下文的,如果它让您感到惊讶,那将不是一个好方法。

    如您所见,老式的ContinueWith 方法存在很多问题。在循环中使用现代的await 更容易实现,而且更难出错。

    【讨论】:

      猜你喜欢
      • 2013-06-03
      • 2021-11-27
      • 1970-01-01
      • 2013-02-08
      • 1970-01-01
      • 1970-01-01
      • 2012-07-01
      • 2020-05-07
      • 2016-06-16
      相关资源
      最近更新 更多