【问题标题】:How can I use "Where" with an async predicate?如何将“Where”与异步谓词一起使用?
【发布时间】:2013-01-31 03:35:47
【问题描述】:

我有一个这样的异步谓词方法:

private async Task<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

假设我有一个Uris 的集合:

var addresses = new[]
{
    new Uri("http://www.google.com/"),
    new Uri("http://www.stackoverflow.com/") //etc.
};

我想使用MeetsCriteria 过滤addresses。我想异步执行此操作;我希望对谓词的多次调用异步运行,然后我想等待它们全部完成并生成过滤后的结果集。不幸的是,LINQ 似乎不支持异步谓词,所以像这样工作:

var filteredAddresses = addresses.Where(MeetsCriteria);

有没有类似的方便的方法来做到这一点?

【问题讨论】:

  • 如果这得到支持,您期望会发生什么?尤其是在迭代 filteredAddresses 时,实际上是在调用 MeetsCriteria 时。
  • @DanielHilgarth:谢谢;那是个很好的观点。这似乎并不适合 LINQ。

标签: .net linq async-await c#-5.0


【解决方案1】:

考虑到框架的较新版本和IAsyncEnumerable&lt;T&gt; 接口的采用,我不会再在这里建议任何其他高度自定义的答案,因为它们在很大程度上是不必要的。

可通过the System.Linq.Async NuGet package 获得异步版本的 LINQ。

这是进行异步检查的语法:

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(async x => await MeetsCriteria(x));

filteredAddresses 的类型为 IAsyncEnumerable&lt;int&gt;,可以是:

  • 通过ToListAsyncFirstAsync 等实现
  • await foreach迭代

要获得和以前一样的效果并允许使用方法组调用,可以将MeetsCriteria的返回类型更改为ValueTask

private async ValueTask<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

...

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(MeetsCriteria);

我不建议只使用 ValueTask 来保存几个字符,因为它应该进行基准测试并用于性能/内存原因。

【讨论】:

    【解决方案2】:

    我会使用以下方法而不是使用ConcurrentBagConcurrentQueue

    public static async IAsyncEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        foreach(var item in source)
        {
            if(await (predicate(item)))
            {
                yield return item;
            }
        }
    }
    

    例如

        var result =  numbers.WhereAsync(async x =>
                                                   await IsEvenAsync(x));
        await foreach (var x in result)
        {
            Console.Write($"{x},");
        }
    

    【讨论】:

    • 这里没有并行谓词计算。
    • 任务是并行评估的。当我们 await 一个任务时,没有什么能阻止另一个任务的执行(假设它是一个 IO 操作或另一个线程)。
    【解决方案3】:

    我认为这比不使用任何并发队列的公认答案更简单。

    public static async Task<IEnumerable<T>> Where<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        var results = await Task.WhenAll(source.Select(async x => (x, await predicate(x))));
        return results.Where(x => x.Item2).Select(x => x.Item1);
    }
    

    【讨论】:

      【解决方案4】:

      第一种方法:一个接一个地发出所有请求,然后等待所有请求返回,然后过滤结果。 (svick 的代码也是这样做的,但这里我是在没有中间 ConcurrentQueue 的情况下这样做的)。

      // First approach: massive fan-out
      var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) });
      var addressesAndCriteria = await Task.WhenAll(tasks);
      var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A);
      

      第二种方法:一个接一个地执行请求。这将花费更长的时间,但它会确保不会用大量的请求冲击 web 服务(假设 MeetsCriteriaAsync 会发送到 web 服务......)

      // Second approach: one by one
      var filteredAddresses = new List<Uri>();
      foreach (var a in filteredAddresses)
      {
        if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a);
      }
      

      第三种方法:与第二种方法一样,但使用假设的 C#8 功能“异步流”。 C#8 尚未推出,异步流尚未设计,但我们可以梦想! RX 中已经存在 IAsyncEnumerable 类型,希望他们会为它添加更多组合器。 IAsyncEnumerable 的好处是我们可以在前几个filteredAddresses 到来时立即开始使用它们,而不是等待所有内容都被过滤掉。

      // Third approach: ???
      IEnumerable<Uri> addresses = {...};
      IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync);
      

      第四种方法:也许我们不想同时处理所有请求,但我们很乐意一次发出多个请求。也许我们做了实验,发现“一次三个”是一种快乐的媒介。注意:此代码假定在 UI 编程或 ASP.NET 等单线程执行上下文中。如果它在多线程执行上下文中运行,那么它需要一个 ConcurrentQueue 和 ConcurrentList。

      // Fourth approach: throttle to three-at-a-time requests
      var addresses = new Queue<Uri>(...);
      var filteredAddresses = new List<Uri>();
      var worker1 = FilterAsync(addresses, filteredAddresses);
      var worker2 = FilterAsync(addresses, filteredAddresses);
      var worker3 = FilterAsync(addresses, filteredAddresses);
      await Task.WhenAll(worker1, worker2, worker3);
      
      async Task FilterAsync(Queue<Uri> q, List<Uri> r)
      {
        while (q.Count > 0)
        {
          var item = q.Dequeue();
          if (await MeetsCriteriaAsync(item)) r.Add(item);
        }
      }
      

      第四种方法也可以使用 TPL 数据流库。

      【讨论】:

        【解决方案5】:

        我认为框架中没有这样的原因之一是存在许多可能的变化,并且在某些情况下每个选择都是正确的:

        • 谓词应该并行执行还是串行执行?
          • 如果它们并行执行,它们应该同时执行,还是应该限制并行度?
          • 如果它们并行执行,结果应该与原始集合的顺序相同、完成顺序还是未定义顺序?
            • 如果它们应该按完成的顺序返回,是否应该有某种方法(异步)在它们完成时获取结果? (这需要将返回类型从 Task&lt;IEnumerable&lt;T&gt;&gt; 更改为其他类型。)

        您说您希望谓词并行执行。在这种情况下,最简单的选择是一次全部执行并按完成顺序返回:

        static async Task<IEnumerable<T>> Where<T>(
            this IEnumerable<T> source, Func<T, Task<bool>> predicate)
        {
            var results = new ConcurrentQueue<T>();
            var tasks = source.Select(
                async x =>
                {
                    if (await predicate(x))
                        results.Enqueue(x);
                });
            await Task.WhenAll(tasks);
            return results;
        }
        

        你可以这样使用它:

        var filteredAddresses = await addresses.Where(MeetsCriteria);
        

        【讨论】:

        • 我会使用不同的方法名称,因此不同的语义(特别是重新排序)变得清晰。
        • @CodesInChaos 是的,可能,但我不确定什么是好名字。 AsyncParallelWhereOrderedByCompletion() 会描述该方法的作用,但这是一个糟糕的名字。
        • 也许像ConcurrentlyFilterAsync 这样的名字比较合适。
        猜你喜欢
        • 1970-01-01
        • 2021-07-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-07-29
        • 2018-04-02
        • 1970-01-01
        • 2018-04-16
        相关资源
        最近更新 更多