【问题标题】:C# One Writer Many Readers read exactly one timeC# 一位作家 许多读者只读了一次
【发布时间】:2013-03-25 21:07:35
【问题描述】:

我有 4 个线程。一种是从网络中读取一些信息,并将其写入变量中,并且应该在每个片段之后发出信号。其中 3 个正在读取这个变量,并且应该只读取一次。当前的解决方案是写入器在写入后设置事件并等待读取器事件。读者等待事件,然后读取并设置他们的事件(意味着他们阅读)。问题是读者可以阅读不止一次,而我在其中有重复。如何实现读者只读一次的规则?

【问题讨论】:

  • 我希望从任何可行的解决方案开始,然后做得更好
  • 我认为读者应该处理重复的写入消息。我认为您不应该让客户来假设更改事件只会触发一次。例如,有些网络场景可能会导致这种行为。

标签: c# .net multithreading synchronization


【解决方案1】:

一种实现方式如下

数据作为单链表在线程之间共享。列表中的每个节点都可以是标记或具有数据。该列表以单个节点开始,该节点键入到标记。当读取数据时,会形成一个新列表,其中包含一系列数据节点,后跟一个标记。此列表附加到添加到列表中的最新标记。

每个阅读器线程都以对原始标记节点和AutoResetEvent 的引用开始。每当写入器中有一条新数据进入时,它都会为每个读取器线程发出AutoResetEvent 信号。然后,阅读器线程将简单地遍历,直到找到没有 Next 节点的标记。

此方案确保所有阅读器只能看到一次数据。最大的复杂性是构建列表,以便可以以无锁方式对其进行写入和读取。这对Interlocked.CompareExchange 来说非常简单

链表类型

class Node<T> { 
  public bool IsMarker;
  public T Data;
  public Node<T> Next;
}

示例编写器类型

class Writer<T> {
  private List<AutoResetEvent> m_list; 
  private Node<T> m_lastMarker;

  public Writer(List<AutoResetEvent> list, Node<T> marker) { 
    m_lastMarker = marker;
    m_list = list;
  }

  // Assuming this can't overlap.  If this can overload then you will
  // need synchronization in this method around the writing of 
  // m_lastMarker      
  void OnDataRead(T[] items) {
    if (items.Length == 0) {
      return;
    }

    // Build up a linked list of the new data followed by a 
    // Marker to signify the end of the data.  
    var head = new Node<T>() { Data = items[0] };
    var current = head;
    for (int i = 1; i < items.Length; i++) {
      current.Next = new Node<T>{ Data = items[i] };
      current = current.Next;
    }
    var marker = new Node<T> { IsMarker = true };
    current.Next = marker;

    // Append the list to the end of the last marker node the writer
    // created 
    m_lastMarker.Next = head;
    m_lastMarker = marker;

    // Tell each of the readers that there is new data 
    foreach (var e in m_list) { 
      e.Set();
    }
  }
}

示例阅读器类型

class Reader<T> { 
  private AutoResetEvent m_event;
  private Node<T> m_marker;

  void Go() {
    while(true) { 
      m_event.WaitOne();
      var current = m_marker.Next;
      while (current != null) { 
        if (current.IsMarker) { 
          // Found a new marker.  Always record the marker because it may 
          // be the last marker in the chain 
          m_marker = current;
        } else { 
          // Actually process the data 
          ProcessData(current.Data);
        }
        current = current.Next;
      }
    }
  }
}

【讨论】:

    【解决方案2】:

    我同意您应该对消费者线程进行编码以接受多次获得相同值的可能性的评论。也许最简单的方法是为每个更新添加一个顺序标识符。这样,线程可以将顺序 id 与它读取的最后一个 id 进行比较,并知道它是否得到了重复。

    它也会知道它是否错过了一个值。

    但如果你真的需要它们同步并且只获得一次值,我建议你使用两个ManualResetEvent 对象和一个CountdownEvent。以下是它们的使用方法。

    ManualResetEvent DataReadyEvent = new ManualResetEvent();
    ManualResetEvent WaitForResultEvent = new ManualResetEvent();
    CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads);
    

    阅读器线程等待DataReadyEvent

    当另一个线程从网络中读取一个值时,它会这样做:

    Acknowledgement.Reset(NumWaitingThreads);
    DataReadyEvent.Set();  // signal waiting threads to process
    Acknowledgement.WaitOne();  // wait for all threads to signal they got it.
    DataReadyEvent.Reset(); // block threads' reading
    WaitForResultEvent.Set(); // tell threads they can continue
    

    等待线程这样做:

    DataReadyEvent.WaitOne(); // wait for value to be available
    // read the value
    Acknowledgement.Set();  // acknowledge receipt
    WaitForResultEvent.WaitOne(); // wait for signal to proceed
    

    这与每个等待线程有两个事件的效果相同,但更简单。

    但它确实有一个缺点,即如果线程崩溃,这将挂起倒计时事件。但是,如果生产者线程等待所有线程消息,您的方法也将如此。

    【讨论】:

      【解决方案3】:

      这很适合the Barrier class

      您可以使用两个Barriers 在两种状态之间进行翻转。

      这是一个例子:

      using System;
      using System.Threading;
      using System.Threading.Tasks;
      
      namespace Demo
      {
          internal class Program
          {
              private static void Main(string[] args)
              {
                  int readerCount = 4;
      
                  Barrier barrier1 = new Barrier(readerCount + 1);
                  Barrier barrier2 = new Barrier(readerCount + 1);
      
                  for (int i = 0; i < readerCount; ++i)
                  {
                      Task.Factory.StartNew(() => reader(barrier1, barrier2));
                  }
      
                  while (true)
                  {
                      barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point.
      
                      if ((value % 10000) == 0)       // Print message every so often.
                          Console.WriteLine(value);
      
                      barrier2.SignalAndWait(); // Wait for the reader threads to read the current value.
                      ++value;                  // Produce the next value.
                  }
              }
      
              private static void reader(Barrier barrier1, Barrier barrier2)
              {
                  int expected = 0;
      
                  while (true)
                  {
                      barrier1.SignalAndWait(); // Wait for "new data available".
      
                      if (value != expected)
                      {
                          Console.WriteLine("Expected " + expected + ", got " + value);
                      }
      
                      ++expected;
                      barrier2.SignalAndWait();  // Signal that we've read the data, and wait for all other threads.
                  }
              }
      
              private static volatile int value;
          }
      }
      

      【讨论】:

        【解决方案4】:

        我会推荐 ConcurrentQueue - 它保证每个线程从队列中获取一个唯一的实例。 Here 很好地解释了如何使用它。

        ConnurrentQueue&lt;T&gt;.TryDequeue() 是一种线程安全的方法,用于检查队列是否为空以及是否从队列中获取项目。由于它同时执行这两个操作,因此程序员不必担心竞争条件。

        【讨论】:

        • 那么,写入器将新数据添加到队列中,然后读取器读取并删除该元素?
        • 但是作者怎么知道它应该添加多少元素到这个队列中呢?或者它会自动复制三个元素?
        【解决方案5】:

        我想我已经找到了方法。我创建了 2 个 AutoResetEvent 数组,每个读取器有 2 个事件,等待写入事件并设置读取事件,写入器设置所有写入事件并等待所有读取事件。

        JaredPar,你的回答很有用,对我有帮助

        【讨论】:

          猜你喜欢
          • 2010-12-12
          • 1970-01-01
          • 1970-01-01
          • 2011-08-24
          • 2015-05-05
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多