【问题标题】:How do I execute several Tasks and collect their output? [closed]如何执行多个任务并收集它们的输出? [关闭]
【发布时间】:2018-01-23 20:16:01
【问题描述】:

我正在尝试并行运行多个 Web 服务并将它们的结果收集到一个容器中。我很难弄清楚正确的语法。

public async Task<IEnumerable<Rate>> GetRates(Address originAddress, Address destinationAddress,Package package)
{
    var rates = new List<Rate>();
    var tasks = Carriers.Where(carrier => carrier.Enabled)
        .Select(async carrier => 
        {
            try
            {
                rates = await carrier.GetRates(originAddress, destinationAddress, package);
            }
            finally
            {
            }
        });

     await Task.WhenAll(tasks).ConfigureAwait(false);

    return rates;
}

【问题讨论】:

  • 使用 rates.AddRange(await carrier.GetRates.....)

标签: c# .net asynchronous task-parallel-library


【解决方案1】:

一种方法是:

public async Task<IEnumerable<Rate>> GetRates(Address originAddress, Address destinationAddress,Package package)
{
    var rates = new List<Rate>();
    var tasks = Carriers.Where(carrier => carrier.Enabled)
        .Select(async carrier => 
        {
            try
            {
                return carrier.GetRates(originAddress, destinationAddress, package);
            }
            finally
            {
            }
        });

     await Task.WhenAll(tasks).ConfigureAwait(false);
            foreach (var item in tasks)
            {
                rates.AddRange(item.Result);
            }
    return rates;
}

另一种方法是将 GetRates 中的列表添加到 ConcurrentBag。测试两者,看看哪个对你自己来说更快;-)

    public async Task<IEnumerable<string>> GetRates()
    {
        var rates = new ConcurrentBag<Rate>();
        var tasks = rates.Where(carrier => carrier.Enabled)
            .Select(async carrier =>
            {
                try
                {
                    await Task.Run(async () =>
                    {
                        var t = await Task.Run(() => carrier.GetRates...
                        foreach (var item in t)
                        {
                            rates.Add(item);
                        }
                    });

                }
                finally
                {
                }
            }); 

        await Task.WhenAll(tasks).ConfigureAwait(false);
        return rates;
    }

还有另一个解决方案,基于 felix-b 的解决方案。好吧,我非常喜欢它,所以我决定将它作为一种扩展方法。扩展看起来像这样:

public static IEnumerable<Task<T>> AsItCompletes<T>(this IEnumerable<Task<T>> taskList)
        {
            var tasks = taskList.ToList();
            var sources = tasks.Select(x => new TaskCompletionSource<T>()).ToList();

            int currentIndex = -1;
            foreach (var task in tasks)
            {
                task.ContinueWith(completed =>
                {
                    var next = sources[Interlocked.Increment(ref currentIndex)];
                    if (completed.IsFaulted)
                    {
                        next.SetException(completed.Exception);
                    }
                    else if (completed.IsCanceled)
                    {
                        next.SetCanceled();
                    }
                    else
                    {
                        next.SetResult(completed.Result);
                    }
                }, TaskContinuationOptions.ExecuteSynchronously);
            }

            return sources.Select(source => source.Task);
        }

生成的代码更紧凑,更易于阅读(无需双重等待,无需删除):

public async Task<IEnumerable<Rate>> GetRates(
    Address originAddress, Address destinationAddress, Package package)
{
    List<Task<Rate[]>> mappedTasks = Carriers
        .Where(c => c.Enabled)
        .Select(carrier => ProcessOneCarrier(
            carrier, originAddress, destinationAddress, package))
        .ToList();

    List<Rate> reducedResults = new List<Rate>();

    foreach (var task in mappedTasks.AsItCompletes())
    {
        Rate[] rates = await task;
        if (task.Exception != null)
        {
            // Handle Exception
        }
        reducedResults.AddRange(rates);           
    }

    return reducedResults;
}

async Task<Rate[]> ProcessOneCarrier(
    Carrier carrier, Address originAddress, Address destinationAddress, Package package)
{
    var rates = await carrier.GetRates(originAddress, destinationAddress, package);
    return rates.ToArray();
}

【讨论】:

  • 很好的答案!我有一个问题。随着每项任务的完成,我想捕捉任何异常并继续执行其他异常。这仅仅是用 try...catch 块包围 foreach 块的问题吗? SetException 有点神秘。
  • 不。在等待之后检查 task.Exception 是否有值。我应该将它添加到示例中。像 if (task.Exception != null) 做某事
  • 所以我将创建一个 AggregateAcception 添加任何任务例外。如果AggregateException在任务完成时出现异常,那么抛出它?
  • 我认为你根本不需要 AggregateException。如果任务失败,则抛出的异常将在 task.Exception 上。你如何处理它取决于你,取决于发生了什么......我添加了一个 sn-p 用于异常处理
  • 我可能不需要它,但是如果所有运营商都抛出异常,我想一次性返回给调用者。
【解决方案2】:

这是一个不需要锁定的更有效的解决方案(ConcurrentBag 是不必要的):

public async Task<IEnumerable<Rate>> GetRates(
    Address originAddress, Address destinationAddress, Package package)
{
    List<Task<Rate[]>> mappedTasks = Carriers
        .Where(c => c.Enabled)
        .Select(carrier => ProcessOneCarrier(
            carrier, originAddress, destinationAddress, package))
        .ToList();

    List<Rate> reducedResults = new List<Rate>();

    while (mappedTasks.Count > 0)
    {
        var finishedTask = await Task.WhenAny(mappedTasks);
        Rate[] rates = await finishedTask;
        reducedResults.AddRange(rates);
        mappedTasks.Remove(finishedTask);
    }

    return reducedResults;
}

async Task<Rate[]> ProcessOneCarrier(
    Carrier carrier, Address originAddress, Address destinationAddress, Package package)
{
    var rates = await carrier.GetRates(originAddress, destinationAddress, package);
    return rates.ToArray();
}

基于Start Multiple Async Tasks and Process Them As They Complete 样本。

【讨论】:

  • 是的。更好的解决方案
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-20
  • 1970-01-01
  • 2016-03-20
相关资源
最近更新 更多