【问题标题】:Parallel.Foreach with modulo partitioning带模分区的 Parallel.Foreach
【发布时间】:2017-06-07 15:02:10
【问题描述】:

当使用Parallel.Foreach() 使用 4 个线程处理 100 个项目时,它将列表分成 4 个项目块(0-24,25-49,50-74,75-99),这意味着项目 0, 25、50、75并行处理。

是否有可能以某种方式对项目进行分区以首先处理具有较低索引的项目?喜欢:

Thread 1: 0, 5, 9,..
Thread 2: 1, 6, 10,...
Thread 3: 2, 7, 11,...
Thread 4: 3, 8, 12,...

【问题讨论】:

标签: c# parallel.foreach


【解决方案1】:

这种分区方法称为循环或条带化。将它与Parallel.ForEach() 一起使用的主要挑战是ForEach() 需要分区器来支持动态分区,而这种类型的分区是不可能的,因为在执行循环之前必须固定分区的数量。

实现这种分区的一种方法是创建一个派生自System.Collections.Concurrent.Partitioner<TSource> 的自定义类并使用ParallelQuery.ForAll() 方法,该方法不具备动态分区支持要求。对于大多数应用程序,这应该等同于使用ForEach()

以下是自定义Partitioner 的示例和基本实现。 Partitioner 将生成与并行度相同的分区数。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace RoundRobinPartitioning
{
    public class RoundRobinPartitioner<TSource> : Partitioner<TSource>
    {
        private readonly IList<TSource> _source;

        public RoundRobinPartitioner(IList<TSource> source)
        {
            _source = source;
        }

        public override bool SupportsDynamicPartitions { get { return false; } }

        public override IList<IEnumerator<TSource>> GetPartitions(int partitionCount)
        {
            var enumerators = new List<IEnumerator<TSource>>(partitionCount);

            for (int i = 0; i < partitionCount; i++)
            {
                enumerators.Add(GetEnumerator(i, partitionCount));
            }

            return enumerators;
        }

        private IEnumerator<TSource> GetEnumerator(
            int partition,
            int partitionCount)
        {
            int position = partition;
            TSource value;

            while (position < _source.Count)
            {
                value = _source[position];
                position += partitionCount;
                yield return value;
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var values = Enumerable.Range(0, 100).ToList();

            var partitioner = new RoundRobinPartitioner<int>(values);

            partitioner.AsParallel()
                .WithDegreeOfParallelism(4)
                .ForAll(value =>
                {
                    // Perform work here
                });
        }
    }
}

【讨论】:

  • 这正是我在问题中所问的。我忘了说:我想 Break()。 ForAll() 有可能吗?我可以设置一个中断标志,在要执行的操作中首先检查它,但我仍然必须遍历所有项目。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-02-13
  • 1970-01-01
  • 2011-09-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多