【问题标题】:Unable to implement data parsing in a multi-threaded context using lock无法使用锁在多线程上下文中实现数据解析
【发布时间】:2021-02-06 13:34:38
【问题描述】:

我已经建立了一个程序

  1. 从文件中获取记录数据列表
  2. 解析并清理解析对象中的每条记录
  3. 将其输出到输出文件

到目前为止,这在单线程上有效,但考虑到在某些情况下记录可能超过 100 万条,我们希望在多线程上下文中实现这一点。多线程在 .Net 中对我来说是新事物,我已经尝试过,但它不起作用。下面我将提供更多细节和代码:

主类(简体):

public class MainClass
{
    parseObject[] parseObjects;
    Thread[] threads;
    List<InputLineItem> inputList = new List<InputLineItem>();
    FileUtils fileUtils = new FileUtils();

    public GenParseUtilsThreaded(int threadCount)
    {
        this.threadCount = threadCount;
        Init();
    }

    public void Init()
    {    
        inputList = fileUtils.GetInputList();
        parseObjects = new parseObject[threadCount - 1];
        threads = new Thread[threadCount - 1];
        InitParseObjects();
        Parse();       
     }

    private void InitParseObjects()
    {
//using a ref of fileUtils to use as my lock expression
        parseObjects[0] = new ParseObject(ref fileUtils); 
        parseObjects[0].InitValues();
        for (int i = 1; i < threadCount - 1; i++)
        {
            parseObjects[i] = new parseObject(ref fileUtils);
            parseObjects[i].InitValues();
        }
    }

    private void InitThreads()
    {
        for (int i = 0; i < threadCount - 1; i++)
        {
            Thread t = new Thread(new ThreadStart(parseObjects[0].CleanupAndParseInput));
            threads[i] = t;
        }
    }
    public void Parse()
    {
        try
        {   
            InitThreads();
            int objectIndex = 0;

            foreach (InputLineItem inputLineItem in inputList)
            {
                parseObjects[0].inputLineItem = inputLineItem;
                threads[objectIndex].Start();
                objectIndex++;
                if (objectIndex == threadCount)
                {
                    objectIndex = 0;
                    InitThreads(); //do i need to re-init the threads after I've already used them all once?
                }
            }

        }
        catch (Exception e)
        {
            Console.WriteLine("(286) The following error occured: " + e);
        }

    }


}
}

还有我的 Parse 对象类(也简化了):

public class ParseObject
    {
        public ParserLibrary parser { get; set; }
        public FileUtils fileUtils { get; set; }
        public InputLineItem inputLineItem { get; set; }

        public ParseObject( ref FileUtils fileUtils)
        {
            this.fileUtils = fileUtils;
        }
       
        public void InitValues()
        {
            //relevant config of parser library object occurs here
        }  

        public void CleanupFields()
        {
            parser.Clean(inputLineItem.nameValue);
            inputLineItem.nameValue = GetCleanupUpValueFromParser();  
        }

        private string GetCleanupFieldValue()
        {
            //code to extract cleanup up value from parses
        }

        public void CleanupAndParseInput()
        {
            CleanupFields();
            ParseInput();
        }

        public void ParseInput()
        {

            try
            {
                parser.Parse(InputLineItem.NameValue);
            }
            catch (Exception e)
            {
            }
        
            try
            {
                lock (fileUtils)
                {
                    WriteOutputToFile(inputLineItem);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("(414) Failed to write to output: " + e);
            }
        }

        public void WriteOutputToFile(InputLineItem inputLineItem)
        {
          //writes updated value to output file
        }
    }

我得到的错误是在尝试运行 Parse 函数时,我收到以下消息:

An unhandled exception of type 'System.AccessViolationException' occurred in GenParse.NET.dll
Attempted to read or write protected memory. This is often an indication that other memory is corrupt.

话虽如此,除了导致该错误的原因之外,我觉得我在这里做错的还有很多。

我还有其他问题:

  1. 我是否应该创建多个解析对象并在尝试时将它们迭代地提供给每个线程,还是应该使用一个在每个线程之间共享或克隆的 Parse 对象?
  2. 如果在线程之外,我更改了传递给线程的对象中的值,该更改会反映在传递给线程的对象中吗?即,对象是按值传递还是按引用传递?
  3. 有没有比我目前使用 objectIndex 迭代器更有效的方法将每条记录分配给线程及其解析对象?

谢谢!

【问题讨论】:

  • 我猜测 (1) ParserLibrary 至少部分使用非托管代码实现,(2) 它不是线程安全的。
  • @MatthewWatson 1) 是的 - 它是一个 C 库。 2)是什么使它不是线程安全的,我如何使它成为线程安全的?我看到一个帖子表明 ParserLibrary 应该是 Lock 函数中的表达式才能实现这一点。对吗?
  • 任何访问ParserLibrary 的东西都应该在lock 语句中,是的。有无数种方法可能不是线程安全的,所以如果不研究源代码就不可能准确地说出原因!
  • 我只从我包含的代码中删除了不相关的代码,这样它仍然可以描绘完整的画面。我没有包括图书馆,因为我认为它不相关,而且我也不允许分享它:)

标签: c# asp.net multithreading locking


【解决方案1】:

我是否应该创建多个解析对象并在尝试时将它们迭代地提供给每个线程,还是应该使用一个在每个线程之间共享或克隆的 Parse 对象?

您使用new ThreadStart(parseObjects[0].CleanupAndParseInput) 初始化每个线程,因此所有线程将共享同一个解析对象。解析对象不是线程安全的,这是一个相当安全的赌注。所以每个线程都应该有一个单独的对象。请注意,这可能还不够,如果解析库使用任何全局字段,即使使用单独的对象,它也可能是非线程安全的。

如果在线程之外,我更改了传递给线程的对象中的值,该更改会反映在传递给线程的对象中吗?即对象是按值传递还是引用传递?

对象(即类)通过引用传递。但是,除非发出 memoryBarrier,否则不能保证对对象的任何更改在其他线程中可见。大多数同步代码(如lock)都会发出内存屏障。请记住,如果同时写入和读取字段,则任何非atomic operation 都是不安全的。

有没有比我目前使用 objectIndex 迭代器更有效的方法将每个记录分配给线程及其解析对象?

以这种方式使用手动线程非常老派。现代、更简单且可能更快的方法是使用并行 for 循环。这将尝试明智地了解它将使用多少线程,并尝试调整块大小以保持同步开销较低。

        var items = new List<int>();

        ParseObject LocalInit()
        {
            // Do initalization, This is run once for each thread used
            return new ParseObject();
        }

        ParseObject ThreadMain(int value, ParallelLoopState state, ParseObject threadLocalObject)
        {
            // Do whatever you need to do
            // This is run on multiple threads
            return threadLocalObject;
        }

        void LocalFinally(ParseObject obj)
        {
            // Do Cleanup for each thread
        }

        Parallel.ForEach(items, LocalInit, ThreadMain, LocalFinally);

作为最后一点,我建议不要使用多线程,除非您熟悉它所涉及的潜在危险和陷阱,至少对于结果很重要的任何项目都是如此。有很多方法可以搞砸并制作一个在 99.9% 的时间内可以运行的程序,并在剩余的 0.1% 的时间内默默地破坏数据。

【讨论】:

  • 嗨@JonasH,感谢您抽出宝贵时间给出这个答案,但我仍然想测试您建议的方法。我还有 2 个问题:1)我的理解是否正确,您建议的并行 For 循环方法根本不需要我仍然创建线程对象? 2)目前我有一个 ParseObjects 数组,与所需的线程数一样多,因此由于线程安全问题,每个线程都有自己的。我需要在并行 for 循环中做类似的事情,还是可以使用单个对象?
  • 1) 是的,Parallel.For 将使用线程池线程,因此无需手动创建任何线程。 2)使用 LocalInit/LocalFinally 创建线程特定的对象,这就是它的用途。检查示例。