【问题标题】:Parallel iteration in C#?C#中的并行迭代?
【发布时间】:2010-10-06 02:38:17
【问题描述】:

有没有办法在 C# 中对并行枚举进行 foreach 样式迭代?对于可下标的列表,我知道可以使用常规的 for 循环在索引范围内迭代 int,但出于多种原因,我更喜欢 foreachfor

如果它在 C# 2.0 中工作,则加分

【问题讨论】:

  • 难道 for 循环不是更简单、更短、更易读的解决方案,而不是下面的组合响应吗?在这种情况下,您选择 foreach 的原因是什么
  • 并行迭代也刚刚出现在 Ruby 1.9 中,所以我敢打赌它现在还没有出现在 C# 中。虽然 LISP 有 :)
  • 我不确定我是否正确理解了这个问题。您是在尝试并行迭代多个可枚举项,还是尝试遍历一个可枚举项,并行处理不同的项目?
  • 我说的是并行的多个枚举。就像平行排列的铅笔和纸一样。我希望每支铅笔都写在相应的纸上。

标签: c# loops foreach iteration


【解决方案1】:

.NET 4 的 BlockingCollection 使这非常容易。创建一个 BlockingCollection,在 enumerable 方法中返回它的 .GetConsumingEnumerable()。然后 foreach 简单地添加到阻塞集合中。

例如

private BlockingCollection<T> m_data = new BlockingCollection<T>();

public IEnumerable<T> GetData( IEnumerable<IEnumerable<T>> sources )
{
    Task.Factory.StartNew( () => ParallelGetData( sources ) );
    return m_data.GetConsumingEnumerable();
}

private void ParallelGetData( IEnumerable<IEnumerable<T>> sources )
{
    foreach( var source in sources )
    {
        foreach( var item in source )
        {
            m_data.Add( item );
        };
    }

    //Adding complete, the enumeration can stop now
    m_data.CompleteAdding();
}

希望这会有所帮助。 顺便说一句,我昨晚posted a blog about this

安德烈

【讨论】:

  • 这并没有回答问题,这是关于一起枚举多个枚举。
【解决方案2】:

简短的回答,不。 foreach 一次只能处理一个可枚举对象。

但是,如果您将并行枚举组合成一个,则可以foreach 超过组合。我不知道有什么简单的内置方法可以做到这一点,但是以下应该可以工作(尽管我没有测试过):

public IEnumerable<TSource[]> Combine<TSource>(params object[] sources)
{
    foreach(var o in sources)
    {
        // Choose your own exception
        if(!(o is IEnumerable<TSource>)) throw new Exception();
    }

    var enums =
        sources.Select(s => ((IEnumerable<TSource>)s).GetEnumerator())
        .ToArray();

    while(enums.All(e => e.MoveNext()))
    {
        yield return enums.Select(e => e.Current).ToArray();
    }
}

然后你可以foreach 覆盖返回的枚举:

foreach(var v in Combine(en1, en2, en3))
{
    // Remembering that v is an array of the type contained in en1,
    // en2 and en3.
}

【讨论】:

  • 为什么选择 object[] 而不是 IEnumerable 作为参数类型?它会消除异常,不是吗?
  • 至少有一个版本的 C# 语言不会用 object[] 以外的任何东西编译参数。考虑到这个答案已有 5 年历史,我猜当时我使用的唯一版本是不适用于 params IEnumerable&lt;TSource&gt;[] sources 的版本。这些天来,我当然会使用更明确的类型。我可能还会对两个可枚举项使用 IEnumerable&lt;T&gt;.Zip 扩展方法 - 可以肯定,五年前这也不存在。
【解决方案3】:

Zooba 的回答不错,不过你可能还想看看"How to iterate over two arrays at once" 的回答。

【讨论】:

    【解决方案4】:

    我从 .NET4 Parallel 库中编写了 EachParallel() 的实现。它与 .NET 3.5 兼容:Parallel ForEach Loop in C# 3.5 用法:

    string[] names = { "cartman", "stan", "kenny", "kyle" };
    names.EachParallel(name =>
    {
        try
        {
            Console.WriteLine(name);
        }
        catch { /* handle exception */ }
    });
    

    实施:

    /// <summary>
    /// Enumerates through each item in a list in parallel
    /// </summary>
    public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action)
    {
        // enumerate the list so it can't change during execution
        list = list.ToArray();
        var count = list.Count();
    
        if (count == 0)
        {
            return;
        }
        else if (count == 1)
        {
            // if there's only one element, just execute it
            action(list.First());
        }
        else
        {
            // Launch each method in it's own thread
            const int MaxHandles = 64;
            for (var offset = 0; offset < list.Count() / MaxHandles; offset++)
            {
                // break up the list into 64-item chunks because of a limitiation             // in WaitHandle
                var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles);
    
                // Initialize the reset events to keep track of completed threads
                var resetEvents = new ManualResetEvent[chunk.Count()];
    
                // spawn a thread for each item in the chunk
                int i = 0;
                foreach (var item in chunk)
                {
                    resetEvents[i] = new ManualResetEvent(false);
                    ThreadPool.QueueUserWorkItem(new WaitCallback((object data) =>
                    {
                        int methodIndex = (int)((object[])data)[0];
    
                        // Execute the method and pass in the enumerated item
                        action((T)((object[])data)[1]);
    
                        // Tell the calling thread that we're done
                        resetEvents[methodIndex].Set();
                    }), new object[] { i, item });
                    i++;
                }
    
                // Wait for all threads to execute
                WaitHandle.WaitAll(resetEvents);
            }
        }
    }
    

    【讨论】:

      【解决方案5】:

      如果您想坚持基础 - 我以更简单的方式重写了当前接受的答案:

          public static IEnumerable<TSource[]> Combine<TSource> (this IEnumerable<IEnumerable<TSource>> sources)
          {
              var enums = sources
                  .Select (s => s.GetEnumerator ())
                  .ToArray ();
      
              while (enums.All (e => e.MoveNext ())) {
                  yield return enums.Select (e => e.Current).ToArray ();
              }
          }
      
          public static IEnumerable<TSource[]> Combine<TSource> (params IEnumerable<TSource>[] sources)
          {
              return sources.Combine ();
          }
      

      【讨论】:

        【解决方案6】:

        这对你有用吗?

        public static class Parallel
        {
            public static void ForEach<T>(IEnumerable<T>[] sources,
                                          Action<T> action)
            {
                foreach (var enumerable in sources)
                {
                    ThreadPool.QueueUserWorkItem(source => {
                        foreach (var item in (IEnumerable<T>)source)
                            action(item);
                    }, enumerable);
                }
            }
        }
        
        // sample usage:
        static void Main()
        {
            string[] s1 = { "1", "2", "3" };
            string[] s2 = { "4", "5", "6" };
            IEnumerable<string>[] sources = { s1, s2 };
            Parallel.ForEach(sources, s => Console.WriteLine(s));
            Thread.Sleep(0); // allow background threads to work
        }
        

        对于 C# 2.0,您需要将上面的 lambda 表达式转换为委托。

        注意:此实用程序方法使用后台线程。您可能想要修改它以使用前台线程,并且可能您想要等到所有线程完成。如果你这样做,我建议你创建sources.Length - 1 线程,并将当前正在执行的线程用于最后一个(或第一个)源。

        (我希望我可以在我的代码中包含等待线程完成,但很抱歉我还不知道该怎么做。我想你应该使用 a WaitHandle Thread.Join().)

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2010-11-07
          • 2013-06-15
          • 2015-08-24
          • 1970-01-01
          • 1970-01-01
          • 2023-03-06
          • 1970-01-01
          • 2018-10-18
          相关资源
          最近更新 更多