【问题标题】:Multiple iterations with multithreading多线程的多次迭代
【发布时间】:2019-06-07 10:23:34
【问题描述】:

我重写了最初只针对一个线程的方法,以便与多个线程一起工作。现在,此方法接受两个并发集合:ConcurrentBag,即List,和ConcurrentQueue,即Queue

目的是匹配两个集合中的两个标题并进行一些逻辑,这是ConcurrentBag项目中的简单赋值。我当然知道ConcurrentBag 中的所有符号都在ConcurrentQueue 中。

当我为多线程编写此内容时,发生了一些标题不匹配 (~20%),这在线程上没有发生。只有在调试期间,我才能匹配这些标题,然后分配值。迭代这两个集合一定有一些问题。也许在同一时间许多线程从同一个项目中读取值,但只读取应该没有问题?

以下代码:

public void UpdateWithPercent(ref ConcurrentBag<Book> refList, ConcurrentQueue<Book> list)
{
    var size = list.Count;
    int numProcs = Environment.ProcessorCount;
    var divider = CalculatBiggestDivider(size);
    var nextIteration = 0;
    var remainingWork = numProcs;
    var internalRefList = refList;
    using (ManualResetEvent mre = new ManualResetEvent(false))
    {
        for (int i = 0; i < numProcs; i++)
        {
            ThreadPool.QueueUserWorkItem(delegate
            {
                IEnumerable<Book> partialList;
                while (-(nextIteration - Interlocked.Add(ref nextIteration, (partialList = DequeueChunk(list, divider)).Count()))> 0)
                {
                    foreach (var item in partialList)
                    {
                        foreach (var x in internalRefList)
                        {
                            if (x.Title == item.Title)
                            {
                                x.Orders += item.Orders;
                                break;
                            }
                        };
                    }
                }

                if (Interlocked.Decrement(ref remainingWork) == 0)
                {
                    mre.Set();
                }
            });
        }

        mre.WaitOne();
    }

    refList = internalRefList;
}

private int CalculatBiggestDivider(int count)
{
    var divider = 1;
    for (int i = 30; i > 0; i--)
    {
        if (count % i == 0)
        {
            divider = i;
            break;
        }
    }

    return divider;
}

private IEnumerable<T> DequeueChunk<T>(ConcurrentQueue<T> queue, int chunkSize)
{
    for (int i = 0; i < chunkSize && queue.Count > 0; i++)
    {
        T item;
        bool success = queue.TryDequeue(out item);
        if (!success)
        {
            i = chunkSize;
            continue;
        }
        yield return item;
    }
}

【问题讨论】:

  • ConcurrentBagConcurrentQueue的初衷是什么?这是一个糟糕的性能(嵌套foreach 循环)吗?如果是这样,为什么不将refList 变成Dictionary&lt;string, Book&gt;HashSet&lt;string&gt; 从而摆脱内部foreach
  • 能否请您用通俗易懂的语言描述该例程背后的逻辑?我怀疑多线程是否会更快;但是,将时间复杂度从 O(N * M)(嵌套循环)转变为 O(N + M)Hashset/Dictionary 在单个 foreach 内)是有希望的。
  • @Dmitry Bychenko,是的,你有权利认为嵌套循环并不是最好的性能。实际上,使用 Dictionary 会有很大的好处。我没有考虑。将其重写为多线程仅用于学习目的。我想知道什么在这里不起作用。
  • 线程可以读取相同的键、值等(int x = dict[someKey]; 可以)这是我们应该保护的写入
  • 不幸的是,没有。 Dictionary&lt;K, V&gt;List&lt;T&gt; 在修改(添加/编辑/删除)时都希望独占访问 - 在修改时,其他线程不得从这些集合中读取或写入任何数据。

标签: c# multithreading concurrency


【解决方案1】:

最终我决定退出嵌套循环并使用ConcurrentDictionary。现在可以了。

public void UpdateWithPercent(ref ConcurrentDictionary<string, Book> refList, List<Book> list, int ticker, int maxTimes)
{
    var size = list.Count;
    int numProcs = Environment.ProcessorCount;
    var divider = CalculatBiggestDivider(size);
    var nextIteration = 0;
    var remainingWork = numProcs;
    var internalRefList = refList;
    using (ManualResetEvent mre = new ManualResetEvent(false))
    {
        for (int i = 0; i < numProcs; i++)
        {
            ThreadPool.QueueUserWorkItem(delegate
            {
                int index = 0;
                while ((index = Interlocked.Add(ref nextIteration, divider) - divider) < size)
                {
                    foreach (var item in list.GetRange(index, divider))
                    {
                        Book x;
                        if (internalRefList.TryGetValue(item.Title, out x))
                        {
                                x.Orders += item.Orders;
                        }
                    };
                }

                if (Interlocked.Decrement(ref remainingWork) == 0)
                {
                    mre.Set();
                }
            });
        }

        mre.WaitOne();
    }

    refList = internalRefList;
}

【讨论】:

  • 这个问题其实不需要多线程解决方案。如果需要,最好让框架使用Parallel 类进行分区和线程管理。
  • @Theodor Zoulias,我知道,但我想在没有 Parallel 的情况下这样做。
  • 如果您觉得需要在应用程序代码中使用ManualResetEvent 类和ThreadPool.QueueUserWorkItem 方法,那么您做错了。这些是低级工具,最适合由专家编写和维护的服务器组件和库。
猜你喜欢
  • 1970-01-01
  • 2011-07-29
  • 1970-01-01
  • 2020-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-11
相关资源
最近更新 更多