【问题标题】:Multithreading BlockingCollection same value多线程 BlockingCollection 相同的值
【发布时间】:2014-05-15 01:03:13
【问题描述】:

我在 C# 应用程序中使用两个线程访问相同的 BlockingCollection。这很好用,但我想两次检索第一个值,以便两个线程检索相同的值 *。

几秒钟后,我想轮询两个线程的 currentIndex 并删除每个值

我怎样才能做到这一点?我想我需要另一种类型的缓冲区..?

提前谢谢你!

*如果thread1调用了.Take(),则该item在collection中被移除,thread2无法再次获取相同的item。


更新:

我想将数据存储在缓冲区中,例如 thread1 将数据保存到 HDD,thread2 分析(相同)数据(并发)。

【问题讨论】:

  • 你的问题很不寻常。我怀疑它与您的程序设计有关。你能给我们更多关于你的程序做什么的信息——特别是数据流吗?我怀疑我们可以为您提供更好的选择。

标签: c# multithreading blockingcollection


【解决方案1】:

使用生产者-消费者将 Value1 添加到两个单独的 ConcurrentQueue。让线程出队,然后从自己的队列中处理它们。

编辑 2014 年 7 月 4 日: 这是一个模糊的、笨拙的、经过深思熟虑的解决方案:创建一个缓冲的自定义对象。它可以包含您尝试在线程 1 中缓冲的信息和线程 2 中的分析结果的空间。

将对象添加到线程 1 中的缓冲区和 BlockingCollection。使用线程 2 分析结果并使用结果更新对象。阻塞集合不应该变得太大,因为它只处理引用不应该影响你的记忆。这假设您不会同时在两个线程上修改缓冲区中的信息。

另一个,也是经过深思熟虑的解决方案是将信息同时输入缓冲区和阻塞集合。分析来自 BlockingCollection 的数据,将其输入输出集合并再次将它们与缓冲区匹配。如果操作正确,此选项可以处理并发修改,但可能需要更多工作。

我认为选项一更好。正如我所指出的,这些只是半成品,但它们可能会帮助您找到适合您特定需求的东西。祝你好运。

【讨论】:

  • 感谢您的回答。所以不可能有一个由两个线程共享的缓冲区并使用相同的值?
  • 我并不是说它不能完成,但我想不出办法。我也不确定它应该完成。通常,并发集合会删除元素以阻止您尝试执行的操作,因为它通常是不可取的。如果你真的必须破解,也许将值添加到非并发链表并读取它们,暂停和删除将起作用。但我想你会后悔走这条路。
  • 嗨,Odrai,我的编辑有帮助吗?我不确定堆栈溢出是否会发送编辑警报,所以您可能没有看到它。
  • 目前我认为最好的解决方案是每个线程都可以从自己的队列中处理。 user743414 提供了相同的解决方案。如果没有人提出更好的解决方案,您将获得 50 REP。
【解决方案2】:

我建议重新考虑您的设计。

当你有一个必须处理的项目列表时,然后给每个线程一个他必须处理的项目队列。

有了这样的解决方案,给两个或多个线程相同的值来处理不会有问题。

类似的东西,没有测试,只是输入。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;

namespace ConsoleApplication2
{

  class Item
  {
    private int _value;
    public int Value
    {
      get
      {
        return _value;
      }
    }

    // all you need
    public Item(int i)
    {
      _value = i;
    }
  }

  class WorkerParameters
  {
    public ConcurrentQueue<Item> Items = new ConcurrentQueue<Item>();
  }

  class Worker
  {
    private Thread _thread;
    private WorkerParameters _params = new WorkerParameters();

    public void EnqueueItem(Item item)
    {
      _params.Items.Enqueue(item);
    }

    public void Start()
    {
      _thread = new Thread(new ParameterizedThreadStart(ThreadProc));
      _thread.Start();
    }

    public void Stop()
    {
      // build somthing to stop your thread
    }

    public static void ThreadProc(object threadParams)
    {
      WorkerParameters p = (WorkerParameters)threadParams;
      while (true)
      {
        while (p.Items.Count > 0)
        {
          Item item = null;
          p.Items.TryDequeue(out item);

          if (item != null)
          {
            // do something
          }

        }
        System.Threading.Thread.Sleep(50);
      }
    }
  }

  class Program
  {
    static void Main(string[] args)
    {

      Worker w1 = new Worker();
      Worker w2 = new Worker();
      w1.Start();
      w2.Start();

      List<Item> itemsToProcess = new List<Item>();
      for (int i = 1; i < 1000; i++)
      {
        itemsToProcess.Add(new Item(i));
      }

      for (int i = 1; i < 1000; i++)
      {
        w1.EnqueueItem(itemsToProcess[i]);
        w2.EnqueueItem(itemsToProcess[i]);
      }


    }
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-11-15
    • 2011-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多