【问题标题】:Unpredictable results when using Parallel.For使用 Parallel.For 时的不可预测的结果
【发布时间】:2016-02-19 06:53:31
【问题描述】:

我正在运行一个 Parallel for 循环,该循环最初运行时间 = 处理器数并执行长时间运行的操作。每个任务完成后,检查更多任务,如果找到,再次调用自身。

我的代码如下所示:

static void Main(string[] args)
{
   Int32 numberOfProcessors = Environment.ProcessorCount;

   Parallel.For(0, numberOfProcessors, index => DoSomething(index, sqsQueueURL));

}

private async static Task DoSomething(int index, string queueURL)
{
   var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = queueURL, WaitTimeSeconds = 20, MaxNumberOfMessages = 1, VisibilityTimeout = 1200 };

   AmazonSQSClient sqsClient = new AmazonSQSClient(new AmazonSQSConfig { MaxErrorRetry = 4 });

   var receiveMessageResponse = sqsClient.ReceiveMessage(receiveMessageRequest);

   foreach (var msg in receiveMessageResponse.Messages)
   {
      PerformALongRunningTask......

      //delete the message

      DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueURL, msg.ReceiptHandle);

      AmazonSQSClient sqsDeleteClient = new AmazonSQSClient();

      sqsDeleteClient.DeleteMessage(deleteMessageRequest);

      //Do it again
      DoSometing(index,queueURL)

   }
}

我得到了非常不可预测的结果。它永远不会完成所有任务。它在完成所有操作之前退出。

我在这里做错了什么?

短代码:

static Int32 TimesToLoop = 143;
static void Main(string[] args)
{

    Int32 numberOfProcessors = Environment.ProcessorCount;

    Parallel.For(0, numberOfProcessors, index => DoSomething(index));

    Console.Read();
}

private async static Task DoSomething(int index)
{
    if(TimesToLoop == 0)
    {
        return;
    }
    Console.WriteLine(index);
    Interlocked.Decrement(ref TimesToLoop);
    DoSomething(index++);
    return;

}

【问题讨论】:

  • 要做的第一件事:将其简化为一个简短但完整的示例,它根本不涉及 Amazon Web Services。问题在于您使用Parallel.For。让你的方法只打印“开始”和索引,然后等待Thread.Delay,然后打印“结束”,索引将展示同样的问题,依赖更少,代码更少。
  • 顺便问一下,您的真实代码是否真的包含await 表达式?如果没有,对您的 DoSomething 方法的每次调用都将同步运行...
  • 没有。它没有。不知道如何在 Parallel.For 中使用 await
  • 我在询问您的 DoSomething 方法,您已将其标记为 async 方法,但它不包含 await 表达式,包括递归调用它时。听起来您可能需要阅读更多关于 async/await...
  • 在DoSomething里面,我还有其他包含await的操作。

标签: c# amazon-web-services task-parallel-library amazon-sqs parallel.foreach


【解决方案1】:

我现在看到各种问题:

  • Parallel.For 刚刚开始任务。它不会等待他们完成。它将等待DoSomething 方法调用返回,但它们正在返回代表异步操作的任务,这些操作可能不会同步完成。
  • 正如 CarbineCoder 所指出的,您的递归几乎可以肯定是有缺陷的。目前尚不清楚您要实现什么目标,但您需要重新考虑这一方面
  • 您的递归不会await 无论如何返回的任务 - 它几乎肯定应该。它可能想要创建在foreach 循环中创建的所有任务的集合并一次性等待它们,或者它可能想要立即await 它们。我们无法判断。

修复第一部分最简单的方法可能是使用Task.WaitAll而不是Parallel.For

var tasks = Enumerable.Range(0, numberOfProcessors)
                      .Select(index => DoSomething(index, sqsQueueURL))
                      .ToList();
Task.WaitAll(tasks);

Task.WhenAll 不同,Task.WaitAll 将阻塞直到所有指定的任务完成。请注意,如果任何任务需要在调用WaitAll 的线程上继续执行,这是安全的,正是因为它阻塞了 - 但如果这是一个控制台应用程序并且你从初始线程,你会没事的,因为无论如何都会在线程池上执行延续。

【讨论】:

  • 我想做的是,一旦一项任务完成,它应该开始一个新的,因为会有很多任务。我试图限制任务数量等于处理器数量。当没有更多消息要处理时,它应该终止。
  • @Asdfg:我建议你不要那样做。您正在调用 Web 服务 - 无论如何您都不会受到 CPU 的限制。即使您只有一个处理器,您也可以同时向 Web 服务发送 100 个请求。你有工作代码吗?在你开始尝试微调之前,让它尽可能简单地工作。,
  • 是的,但我正在运行多个节点,所以如果要等待其他任务完成,我不想锁定消息。
  • @Asdfg:恐怕我不知道该评论的上下文是什么......它似乎与我之前的评论无关。我已经解释了你看到的行为,但我真的认为你需要退后一步,更仔细地考虑你想要实现的目标。
  • 谢谢乔恩。感谢你的帮助。我将所有内容都转换回了 Synchronous 并且它正在工作。稍后我将处理多线程野兽。
【解决方案2】:
private async static Task DoSomething(int index, string queueURL)
{
   ...
   foreach (var msg in receiveMessageResponse.Messages)
   {
      ...
      //Do it again
      DoSometing(index,queueURL)

   }
}

您正在递归调用DoSomething,并且没有条件中断/返回它。这可能会导致 stackoverflow 并终止您的程序。

【讨论】:

  • 是的。刚刚注意到
  • @carbinecoder 有条件中断/返回对吗?当 receiveMessageResponse.Messages 中没有消息时
  • 没错,我错过了,我同意你的观点,应该在receiveMessageResponse.Messages 可用之前完成轮询,但它不必是递归调用。
猜你喜欢
  • 2020-02-25
  • 2013-08-12
  • 2018-01-17
  • 2011-06-26
  • 2011-11-30
  • 1970-01-01
  • 2015-08-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多