【问题标题】:Using threads and semaphores [duplicate]使用线程和信号量[重复]
【发布时间】:2021-08-05 03:52:54
【问题描述】:

我有几千个文件,我使用函数提取一些信息,然后将信息输入数据库。从磁盘读取文件速度很快。提取数据很慢。更新数据库很快。我想在多个线程上执行这个操作,以更好地利用 CPU。我设置了一个信号量,但没有看到预期的行为。我希望看到程序开始处理三个文件,然后完成一个文件,然后才开始另一个文件。一开始,我看到不止三个文件同时开始处理,而且还没有一个完成。

using System;
using System.Threading;
using System.IO;
using System.Collections.Generic;

namespace Threads
{
    class Program
    {
        static Semaphore semaphore = new Semaphore(3, 3);
        static Queue<string> queue = new Queue<string>();
        
        public static void Main(string[] args)
        {
            string[] files = Directory.GetFiles(@"C:\MyFolder");
            foreach (string file in files) {
                queue.Enqueue(file);
            }
            
            while (queue.Count > 0) {
                string fileName1 = NextFile();
                semaphore.WaitOne();
                Thread thread1 = new Thread(() => ProcessFile(fileName1));
                thread1.Start();            
                semaphore.Release();
            }
            
            Console.Write("Press any key to continue . . . ");
            Console.ReadKey(true);
        }
        
        public static void ProcessFile(string fileName)
        {
            Console.WriteLine("Processing file " + fileName);
            string value = ExtractData(fileName);
            InsertInDatabase(value);
            Console.WriteLine("Completed processing file " + fileName);
        }
        
        public static string NextFile()
        {
            string fileName = queue.Dequeue();
            return fileName;
        }
        
        /// <summary>
        /// This function takes a long time
        /// </summary>
        /// <param name="fileName"></param>
        /// <returns></returns>
        static string ExtractData(string fileName)
        {
            Thread.Sleep(5000);
            return "value";
        }
        
        static void InsertInDatabase(string value)
        {
            Thread.Sleep(100);
            // do some work         
        }
    }
}

【问题讨论】:

  • 附带说明,TPL Dataflow 库有一个很好的用例。你可以看一个例子here
  • "从磁盘读取文件很快。提取数据很慢。更新数据库很快。" - 这听起来与我通常期望的相反。你能提供这些操作的任何基准时间来确认吗?
  • 请考虑使用 Rx 来避免所有信号量的东西。您的代码如下所示:var processed = (from file in Directory.GetFiles(@"C:\MyFolder").ToObservable() from data in Observable.Start(() =&gt; ExtractData(file)) from insert in Observable.Start(() =&gt; InsertInDatabase(data)) select file).ToArray().Wait();.
  • 谜团 - 读取文件(通常大约 1MB 大约需要 400 毫秒,提取数据大约需要 1 分钟(过去是大约 8 秒),保存数据需要另外几百毫秒。我必须看看为什么提取数据需要这么长时间......几分钟后我也收到内存不足错误。
  • @Nick_F - NuGet "System.Reactive" 来获取这些位,然后你需要using System.Reqctive.Linq;

标签: c# multithreading task semaphore


【解决方案1】:

当使用信号量时,实际执行工作的线程应该等待,然后释放信号量。

这里主线程正在等待信号量,然后在启动工作线程后立即释放它。

while (queue.Count > 0) {
    string fileName1 = NextFile();
    semaphore.WaitOne();
    Thread thread1 = new Thread(() => ProcessFile(fileName1));
    thread1.Start();            
    semaphore.Release();
}

您应该将sempahore.WaitOne() 移至ProcessFile 方法,因为这将转到实际使用资源的线程。

public static void ProcessFile(string fileName)
{
    semaphore.WaitOne();
    try
    {
        Console.WriteLine("Processing file " + fileName);
        string value = ExtractData(fileName);
        InsertInDatabase(value);
        Console.WriteLine("Completed processing file " + fileName);
    }
    finally
    {
        // make sure the sempahore releases even if we encounter an error
        semaphore.Release();
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-03-27
    • 1970-01-01
    • 2013-12-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多