【问题标题】:Handle multiple threads, one out one in, in a timed loop在定时循环中处理多个线程,一出一入
【发布时间】:2016-01-28 09:52:51
【问题描述】:

我需要在一夜之间处理大量文件,并确定开始和结束时间,以避免干扰用户。我一直在调查,但现在处理线程的方法太多了,我不确定该走哪条路。这些文件作为附件进入 Exchange 收件箱。

基于此处的一些示例和一些实验,我目前的尝试是:

 while (DateTime.Now < dtEndTime.Value)
 {
            var finished = new CountdownEvent(1);
            for (int i = 0; i < numThreads; i++)
            {


                object state = offset;

                finished.AddCount();
                ThreadPool.QueueUserWorkItem(delegate
                {
                    try
                    {
                        StartProcessing(state);
                    }
                    finally
                    {
                        finished.Signal();
                    }
                });

                offset += numberOfFilesPerPoll;

            }
            finished.Signal();
            finished.Wait(); 


        }

为了方便起见,它目前在 winforms 应用程序中运行,但核心处理在 dll 中,因此我可以从 Windows 服务、在调度程序下运行的控制台生成我需要的类,但这是最简单的。我确实设置了一个带有 Timer 对象的 Windows 服务,该对象在配置文件中设置的时间启动处理。

所以我的问题是——在上面的代码中,我初始化了一堆线程(目前是 10 个),然后等待它们全部处理。我的理想是静态数量的线程,当一个线程完成时,我触发另一个线程,然后当我到达结束时间时,我只等待所有线程完成。 这样做的原因是我正在处理的文件是可变大小的 - 有些可能需要几秒钟来处理,有些可能需要几个小时,所以我不希望整个应用程序等待一个线程完成,如果我可以让它滴答作响在后台。 (编辑)就目前而言,每个线程实例化一个类并传递一个偏移量。然后该类从收件箱中获取下 x 封电子邮件,从偏移量开始(使用 Exchange Web 服务分页功能)。随着每个文件的处理,它被移动到一个单独的文件夹中。从到目前为止的一些回复中,我想知道我是否真的应该在外循环中获取电子邮件,并根据需要生成线程。 为了解决这个问题,我目前正在尝试处理积压的电子邮件。清除积压后,夜间运行的负载可能会显着降低。

平均每晚要处理大约 1000 个文件。

更新

我重写了大部分代码,以便可以使用 Parallel.Foreach,但我遇到了线程安全问题。调用代码现在如下所示:

public bool StartProcessing()
        {

            FindItemsResults<Item> emails = GetEmails();



            var source = new CancellationTokenSource(TimeSpan.FromHours(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions { MaxDegreeOfParallelism = 8, CancellationToken = source.Token };

            try
            {
                Parallel.ForEach(emails, opts, processAttachment);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
            catch (Exception err)
            {
                WriteToLogFile(err.Message + "\r\n");
                WriteToLogFile(err.StackTrace + "r\n");
            }
            return true;
        }

到目前为止一切顺利(请原谅临时错误处理)。我现在有一个新问题,即作为电子邮件的“Item”对象的属性不是线程安全的。因此,例如,当我开始处理一封电子邮件时,我将其移动到“处理”文件夹中,以便另一个进程无法抓取它 - 但事实证明,多个线程可能正在尝试处理同一封电子邮件一次。我如何保证不会发生这种情况?我知道我需要添加一个锁,我可以在 ForEach 中添加它还是应该在 processAttachments 方法中添加它?

【问题讨论】:

  • using a thread pool 怎么样?
  • 时间用完了怎么办?你只是停止处理文件吗?处理每个文件与从磁盘读取文件需要多长时间?
  • @Enigmativity - 我实际上正在处理收件箱中电子邮件的附件。不知道你的比较是什么意思?该处理涉及解析附件内容并传递给服务。这需要的时间长度取决于文件的大小和复杂性。首先将文件保存到磁盘并没有显着的收益。回答第一个问题 - 当我用完时间时,我想停止生成新线程,并等待当前线程完成。
  • @EmmaFaulkner - 比较的是加载文件所需的时间(IO 并没有从多线程中得到很大改善)与处理时间(CPU 被多线程大大提高)。
  • @Enigmativity 啊,我明白了。是的,我正在使用多线程来节省时间。在我的情况下,加载文件所花费的时间不是问题,而是解析。电子邮件在一夜之间流入收件箱,因此在每次通过时,我可能有许多线程不做任何事情(或者线程数少于最大值,更正确,但相反我可能有一个或两个需要很长时间处理时间。

标签: c# multithreading timer batch-processing


【解决方案1】:

使用 TPL:

Parallel.ForEach( EnumerateFiles(),
                  new ParallelOptions { MaxDegreeOfParallelism = 10 },
                  file => ProcessFile( file ) );

EnumerateFiles在到达结束时间时停止枚举,就像这样:

IEnumerable<string> EnumerateFiles()
{
    foreach (var file in Directory.EnumerateFiles( "*.txt" ))
        if (DateTime.Now < _endTime)
            yield return file;
        else
            yield break;
}

【讨论】:

  • “使files 在到达结束时间时停止枚举”- 如何?除非您能解释清楚,否则这不是一个非常有用的答案。
【解决方案2】:

您可以将Parallel.ForEach() 与取消令牌源结合使用,这将在设定的时间后取消操作:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(i => i.ToString());

            // For demo purposes, cancel after a few seconds.
            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8, CancellationToken = source .Token};

            try
            {
                Parallel.ForEach(fileList, opts, processFile);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Loop was cancelled.");
            }
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

作为使用取消令牌的替代方法,您可以编写一个返回 IEnumerable&lt;string&gt; 的方法,该方法返回文件名列表,并在时间到时停止返回它们,例如:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static Random rng = new Random();

        static void Main()
        {
            // Process files in parallel, with a maximum thread count.
            var opts = new ParallelOptions {MaxDegreeOfParallelism = 8};
            Parallel.ForEach(fileList(), opts, processFile);
        }

        static IEnumerable<string> fileList()
        {
            // Simulate having a list of files.
            var fileList = Enumerable.Range(1, 100000).Select(x => x.ToString()).ToArray();

            // Simulate finishing after a few seconds.
            DateTime endTime = DateTime.Now + TimeSpan.FromSeconds(10);

            int i = 0;

            while (DateTime.Now <= endTime)
                yield return fileList[i++];
        }

        static void processFile(string file)
        {
            Console.WriteLine("Processing file: " + file);

            // Simulate taking a varying amount of time per file.

            int delay;

            lock (rng)
            {
                delay = rng.Next(200, 2000);
            }

            Thread.Sleep(delay);

            Console.WriteLine("Processed file: " + file);
        }
    }
}

请注意,这种方法不需要 try/catch。

【讨论】:

  • 谢谢,我不知道取消令牌 - 看起来这种方法很适合我正在做的事情,尽管我可能需要稍微调整一下我的代码。
  • 以cancellationtoken 为例,这会停止所有当前正在运行的线程,还是允许它们完成并停止循环?
  • @EmmaFaulkner 它阻止任何新的迭代开始,但允许每个线程在抛出异常之前完成其当前迭​​代的处理。
  • 绝对完美!打算试试这个
  • @EmmaFaulkner 请注意,您仍然需要catch (OperationCanceledException),因为将引发异常以指示循环已取消。
【解决方案3】:

您应该考虑使用 Microsoft 的响应式框架。它让您可以使用 LINQ 查询以非常简单的方式处理多线程异步处理。

类似这样的:

var query =
    from file in filesToProcess.ToObservable()
    where DateTime.Now < stopTime
    from result in Observable.Start(() => StartProcessing(file))
    select new { file, result };

var subscription =
    query.Subscribe(x =>
    {
        /* handle result */
    });

真的,如果 StartProcessing 已定义,这就是您需要的所有代码。

只需 NuGet“Rx-Main”。

哦,要随时停止处理,只需致电 subscription.Dispose()

【讨论】:

    【解决方案4】:

    这是一项真正令人着迷的任务,我花了一段时间才将代码提高到我满意的水平。

    我最终得到了上述的组合。

    首先值得注意的是,我在 Web 服务调用中添加了以下几行,因为我遇到的操作超时,我认为是因为我超出了端点上设置的一些限制,实际上是由于微软在 .Net 2.0 中设置的限制:

    ServicePointManager.DefaultConnectionLimit = int.MaxValue;
    ServicePointManager.Expect100Continue = false;
    

    更多信息请看这里:

    What to set ServicePointManager.DefaultConnectionLimit to

    添加这些代码行后,我的处理速度从 10/分钟提高到 100/分钟左右。

    但我仍然对循环和分区等不满意。我的服务转移到物理服务器上以最大程度地减少 CPU 争用,我希望让操作系统决定它的运行速度,而不是我的代码节流它。

    经过一番研究,这就是我最终得到的结果 - 可以说不是我编写的最优雅的代码,但它非常快速且可靠。

    List<XElement> elements = new List<XElement>();
     while (XMLDoc.ReadToFollowing("ElementName"))
         {
       using (XmlReader r = XMLDoc.ReadSubtree())
          {
       r.Read();
       XElement node = XElement.Load(r);
    //do some processing of the node here...
    elements.Add(node);
    }
    }
    //And now pass the list of elements through PLinQ to the actual web service call, allowing the OS/framework to handle the parallelism
    
    int failCount=0; //the method call below sets this per request; we log and continue
    
    failCount = elements.AsParallel()
                                .Sum(element => IntegrationClass.DoRequest(element.ToString()));
    

    它以极其简单和闪电般的速度结束。

    我希望这可以帮助其他尝试做同样事情的人!

    【讨论】:

      猜你喜欢
      • 2023-01-06
      • 1970-01-01
      • 2013-12-10
      • 1970-01-01
      • 1970-01-01
      • 2013-11-13
      • 2017-11-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多