【问题标题】:Async generator, previous iterations await a future iteration?异步生成器,以前的迭代等待未来的迭代?
【发布时间】:2022-01-27 22:12:04
【问题描述】:

我想生成一个可枚举的任务,这些任务将在不同的时间完成。

如何在 C# 中制作一个生成器:

  • 产生任务
  • 每隔几次迭代,就会解决以前产生的任务,结果现在才知道

我想要这样做的原因是因为我正在处理一个很长的可迭代输入,而且我经常从这些输入中积累足够的数据来发送批处理 API 请求并最终确定我的输出。

伪代码:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var queuedInputs = Queue<Input>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        yield return waiting task for this input
        queuedInputs.Enqueue(input);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            GetFromAPI(queue).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queuedInputs.Count > 0)
                {
                    batchResult = batchResults.Dequeue();
                    historicalInput = queuedInputs.Dequeue();
                    var output = MakeOutput(historicalInput, batchResult);
                    resolve earlier input's task with this output
                }
            });
        }
    }
}

【问题讨论】:

  • task.ContinueWith(result =&gt; ...); 方法返回一个任务,在您的示例中被忽略。您真的打算以fire-and-forget 的方式启动延续吗?您是否还知道ContinueWith intricacies 关于TaskScheduler 在未提供scheduler 参数时默认使用?
  • TaskCompletionSource 相当于手动创建和解析 Promise。但我不确定你想做什么。
  • @TheodorZoulias 火灾和遗忘有什么问题?我对任何 TaskScheduler 的复杂性一无所知
  • theonlygusti 我在之前的评论中提供了两个链接,它们解释了为什么使用即发即弃和ContinueWith 通常不是一个好主意。我的理解是,您正在尝试使用 C# 做一些在 JavaScript 中很常见的事情,但在 C# 世界中却是惯用且非常规的。无论您尝试做什么,都可能有更好的方法来做到这一点,可能使用IAsyncEnumerable&lt;T&gt;s 和 System.Linq.Async 包或 TPL 数据流库。如果您想要更有针对性的建议,您应该描述您正在尝试解决的更广泛的问题。
  • 你可以阅读here微软关于为任务添加Then方法的意见,这(据我了解)比现有的ContinueWith更接近Javascript的then方法。跨度>

标签: c# async-await task


【解决方案1】:

解决方案的形式将取决于问题的形式。我有几个问题,因为您的问题域似乎很奇怪:

  1. 一开始就知道您的所有输入吗? (同步)IEnumerable&lt;Input&gt; 表示它们是。
  2. 您确定要在发送 any 查询之前等待一批输入吗?如果您按 10 个批处理但有 55 个输入,那么“余数”呢?

假设您确实有同步输入,并且您想对余数进行批处理,您可以立即累积所有输入,对它们进行批处理,然后遍历批处理,异步提供输出:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  foreach (var batchedInput in inputs.Batch(10))
  {
    var batchResults = await GetFromAPI(batchedInput);
    for (int i = 0; i != batchedInput.Count; ++i)
      yield return MakeOutput(batchedInput[i], batchResults[i]);
  }
}

public static IEnumerable<IReadOnlyList<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
{
  List<TSource>? batch = null;
  foreach (var item in source)
  {
    batch ??= new List<TSource>(capacity: size);
    batch.Add(item);
    if (batch.Count == size)
    {
      yield return batch;
      batch = null;
    }
  }

  if (batch?.Count > 0)
    yield return batch;
}

更新:

如果您想立即启动 API 调用,可以将它们移出循环:

async IAsyncEnumerable<Output> Process(IEnumerable<Input> inputs)
{
  var batchedInputs = inputs.Batch(10).ToList();
  var apiCallTasks = batchedInputs.Select(GetFromAPI).ToList();
  foreach (int i = 0; i != apiCallTasks.Count; ++i)
  {
    var batchResults = await apiCallTasks[i];
    var batchedInput = batchedInputs[i];
    for (int j = 0; j != batchedInput.Count; ++j)
      yield return MakeOutput(batchedInput[j], batchResults[j]);
  }
}

【讨论】:

  • 我不知道批次会有多大,这取决于输入的一些累积属性。任何剩余部分都应该在最后处理。
  • @theonlygusti:您可以修改Batch 方法然后处理该逻辑。
  • 在 .NET 6 上,Chunk LINQ 运算符可用。对于基于某些任意属性的批处理,我发布了一个实现 here (BatchBySize)。
  • 在此解决方案中最推荐使用async IAsyncEnumerable 的方法是什么,但要先发制人地调用GetFromAPIs,以便在每批之后可能不必像现在一样长时间暂停当前?
【解决方案2】:

一种方法是使用TPL Dataflow 库。该库提供了各种名为“块”的组件(TransformBlockActionBlock 等),其中每个块都在处理其输入数据,然后将结果传播到下一个块。这些块被链接在一起,以便管道中前一个块的完成触发下一个块的完成等,直到最后一个块通常是一个没有输出的ActionBlock&lt;T&gt;。这是一个例子:

var block1 = new TransformBlock<int, string>(item =>
{
    Thread.Sleep(1000); // Simulate synchronous work
    return item.ToString();
}, new()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    EnsureOrdered = false
});

var block2 = new BatchBlock<string>(batchSize: 10);

var block3 = new ActionBlock<string[]>(async batch =>
{
    await Task.Delay(1000); // Simulate asynchronous work
}); // The default MaxDegreeOfParallelism is 1

block1.LinkTo(block2, new() { PropagateCompletion = true });
block2.LinkTo(block3, new() { PropagateCompletion = true });

// Provide some input in the pipeline
block1.Post(1);
block1.Post(2);
block1.Post(3);
block1.Post(4);
block1.Post(5);

block1.Complete(); // Mark the first block as completed
await block3.Completion; // Await the completion of the last block

TPL 数据流库功能强大且灵活,但在异常传播方面存在弱点。如果block3 失败,没有内置方法可以指示block1 停止工作。您可以阅读有关此问题的更多信息here。如果您不希望您的块经常失败,这可能不是一个严重的问题。

【讨论】:

  • 可以在hereCreateDynamicBatchBlock 方法)中找到基于某些“重量”或“尺寸”属性创建批次的自定义 BatchBlock&lt;T&gt;
【解决方案3】:

假设MyGenerator()返回List&lt;Task&lt;T&gt;&gt;,并且任务数量相对较少(即使数百个也可能没问题),那么您可以使用Task.WhenAny(),它返回第一个完成的Task。然后从列表中删除Task,处理结果,然后继续下一个:

var tasks = MyGenerator();
while (tasks.Count > 0) {
    var t = Task.WhenAny(tasks);
    tasks.Remove(t);

    var result = await t; // this won't actually wait since the task is already done
    // Do something with result
}

Stephen Toub 的一篇文章对此进行了很好的讨论,该文章进行了更详细的解释,并在您的任务列表有数千个时提供了替代方案:Processing tasks as they complete

还有这篇文章,但我认为斯蒂芬的写得更好:Process asynchronous tasks as they complete (C#)

【讨论】:

  • 实现MyGenerator() 对我来说是一个具有挑战性的部分
  • 也许第二篇文章会有所帮助。在该示例中,它获取 URL 列表并创建从每个 URL 下载的任务列表。
  • 但是我对如何处理任务列表没有任何问题。我正在努力弄清楚如何首先构建任务列表
  • 您的问题没有足够的信息来帮助您。每个任务实际上在做什么?
  • “返回第一个完成的任务。” -- Gabriel 准确地说,它返回完成的tasks 列表中最左边的任务。如果您在 while 循环内执行任何重要操作,则已完成的任务可能会开始累积,然后您将观察到左侧偏差。
【解决方案4】:

使用TaskCompletionSource:

IEnumerable<Task<Output>> Process(IEnumerable<Input> inputs)
{
    var tcss = new List<TaskCompletionSource<Output>>();
    var queue = new Queue<(Input, TaskCompletionSource<Output>)>();
    var cumulativeLength = 0;
    foreach (var input in inputs)
    {
        var tcs = new TaskCompletionSource<Output>();
        queue.Enqueue((input, tcs));
        tcss.Add(tcs);
        cumulativeLength += input.Length;
        if (cumulativeLength > 10)
        {
            cumulativeLength = 0
            var queueClone = Queue<(Input, TaskCompletionSource<Input>)>(queue);
            queue.Clear();
            GetFromAPI(queueClone.Select(x => x.Item1)).ContinueWith((apiTask) => {
                Queue<BatchResult> batchResults = apiTask.Result;
                while (queueClone.Count > 0)
                {
                    var batchResult = batchResults.Dequeue();
                    var (queuedInput, queuedTcs) = queueClone.Dequeue();
                    var output = MakeOutput(queuedInput, batchResult);
                    queuedTcs.SetResult(output)
                }
            });
        }
    }
    GetFromAPI(queue.Select(x => x.Item1)).ContinueWith((apiTask) => {
        Queue<BatchResult> batchResults = apiTask.Result;
        while (queue.Count > 0)
        {
            var batchResult = batchResults.Dequeue();
            var (queuedInput, queuedTcs) = queue.Dequeue();
            var output = MakeOutput(queuedInput, batchResult);
            queuedTcs.SetResult(output)
        }
    });
    foreach (var tcs in tcss)
    {
        yield return tcs.Task;
    }
}

【讨论】:

  • 您应该使用await 而不是ContinueWith。使用TaskCompletionSource&lt;T&gt; 时,强烈考虑RunContinuationsAsynchronously。队列的使用有点奇怪;我相信这些可以很容易地成为列表(具有预先分配的容量)。肯定存在 TCS 永远不会完成的逻辑流程(例如,错误情况)。整个逻辑最终在第一个项目产生之前调用 API k 次,所以它比必要的复杂得多 - 只需批量输入,多次调用 API,然后 yield 返回结果任务。
  • @StephenCleary 似乎没有优势,即在第一次 yield 之前调用 API k 次意味着调用代码不必在批次之间等待很长时间?
  • 是的。我所说的“比必要更复杂”的意思是,您可以在批处理后立即调用 API k 次,然后遍历生成的任务,等待每个任务。我会用代码更新我的答案。
猜你喜欢
  • 1970-01-01
  • 2017-11-15
  • 2018-05-16
  • 2022-11-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-13
相关资源
最近更新 更多