【问题标题】:Make a C# implementation of a LinkedRingBuffer Thread Safe使 LinkedRingBuffer 线程安全的 C# 实现
【发布时间】:2016-03-06 13:11:01
【问题描述】:

我有三个问题:

  1. 您一般如何看待我解决给定问题的方法?
  2. 您认为我可以在哪些方面进一步提高性能?
  3. 最重要的一点:如何使我的实现真正线程安全?

起初我所处的简化场景: 我正在通过消息系统与不同的设备进行通信。我在相当短的时间内接收和发送成千上万条消息。我在一个多线程环境中,所以很多不同的任务正在发送和期待消息。对于消息接收,事件驱动的方法在使其线程安全的意义上给我们带来了很多麻烦。 我有一些 Receiver 任务从外部获取消息,并且必须将这些消息传递给许多消费者任务。

所以我想出了一个不同的方法: 为什么不拥有几千条消息的历史记录,其中每条新消息都排队,消费者任务可以从最新的项目向后搜索到最后处理的项目,以获得所有新到达的消息。当然,这必须是快速且线程安全的。

我提出了链接环形缓冲区的想法并实现了以下内容:

  public class LinkedRingBuffer<T>
  {
     private LinkedRingBufferNode<T> firstNode;
     private LinkedRingBufferNode<T> lastNode;

     public LinkedRingBuffer(int capacity)
     {
        Capacity = capacity;
        Count = 0;
     }

     /// <summary>
     /// Maximum count of items inside the buffer
     /// </summary>
     public int Capacity { get; }

     /// <summary>
     /// Actual count of items inside the buffer
     /// </summary>
     public int Count { get; private set; }

     /// <summary>
     /// Get value of the oldest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetFirst()
     {
        return firstNode.Item;
     }

     /// <summary>
     /// Get value of the newest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetLast()
     {
        return lastNode.Item;
     }

     /// <summary>
     /// Add item at the end of the buffer. 
     /// If capacity is reached the link to the oldest item is deleted.
     /// </summary>
     public void Add(T item)
     {
        /* create node and set to last one */
        var node = new LinkedRingBufferNode<T>(lastNode, item);
        lastNode = node;
        /* if it is the first node, the created is also the first */
        if (firstNode == null)
           firstNode = node;
        /* check for capacity reach */
        Count++;
        if(Count > Capacity)
        {/* deleted all links to the current first so that its eventually gc collected */
           Count = Capacity;
           firstNode = firstNode.NextNode;
           firstNode.PreviousNode = null;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the oldest to the newest item
     /// </summary>
     public IEnumerable<T> LastToFirst()
     {
        var current = lastNode;
        while(current != null)
        {
           yield return current.Item;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the newest to the oldest item
     /// </summary>
     public IEnumerable<T> FirstToLast()
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           current = current.NextNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the oldest to given item. 
     /// If item doesn't exist it iterates until it reaches the newest
     /// </summary>
     public IEnumerable<T> LastToReference(T item)
     {
        var current = lastNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the newest to given item. 
     /// If item doesn't exist it iterates until it reaches the oldest
     /// </summary>
     public IEnumerable<T> FirstToReference(T item)
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Represents a linked node inside the buffer and holds the data
     /// </summary>
     private class LinkedRingBufferNode<A>
     {
        public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
        {
           Item = item;
           NextNode = null;
           PreviousNode = previousNode;
           if(previousNode != null)
              previousNode.NextNode = this;
        }
        internal A Item { get; }
        internal LinkedRingBufferNode<A> PreviousNode { get; set; }
        internal LinkedRingBufferNode<A> NextNode { get; private set; }
     }
  }

但不幸的是,我对多线程环境有点陌生,那么如何让这个缓冲区线程对多次读写安全?

谢谢!

【问题讨论】:

    标签: c# performance linked-list thread-safety buffer


    【解决方案1】:

    我认为最简单的方法是在执行线程关键代码时使用object 同步locklock 块中的代码称为critical section,一次只能由一个线程访问。任何其他希望访问它的线程都会等待,直到锁被释放。

    定义和初始化:

    private object Synchro;
    
    public LinkedRingBuffer(int capacity)
    {
        Synchro = new object();
        // Other constructor code
    }
    

    用法:

    public T GetFirst()
    {
        lock(Synchro)
        {
            return firstNode.Item;
        }
    }
    

    在编写线程安全代码时,locking 某些部分可能看起来很明显。但是,如果您不确定是否要lock 一个语句或代码块,则为了读写安全,您需要考虑:

    • 此代码是否会影响任何其他锁定的关键部分的行为或结果。
    • 任何其他锁定的关键部分是否会影响此代码的行为或结果。

    您还需要重写一些自动实现的属性以获得支持字段。但是应该很简单...

    您对yield return 的使用虽然在单线程上下文中非常聪明和高效,但在多线程上下文中会引起麻烦。这是因为yield return doesn't release a lock statement (它不应该)。无论您使用yield return,您都必须在包装器中执行具体化。

    您的线程安全代码如下所示:

    public class LinkedRingBuffer<T>
    {
        private LinkedRingBufferNode<T> firstNode;
        private LinkedRingBufferNode<T> lastNode;
        private object Synchro;
    
        public LinkedRingBuffer(int capacity)
        {
            Synchro = new object();
            Capacity = capacity;
            Count = 0;
        }
    
        /// <summary>
        /// Maximum count of items inside the buffer
        /// </summary>
        public int Capacity { get; }
    
        /// <summary>
        /// Actual count of items inside the buffer
        /// </summary>
        public int Count
        {
            get
            {
                lock (Synchro)
                {
                    return _count;
                }
            }
            private set
            {
                _count = value;
            }
        }
        private int _count;
    
        /// <summary>
        /// Get value of the oldest buffer entry
        /// </summary>
        /// <returns></returns>
        public T GetFirst()
        {
            lock (Synchro)
            {
                return firstNode.Item;
            }
        }
    
        /// <summary>
        /// Get value of the newest buffer entry
        /// </summary>
        /// <returns></returns>
        public T GetLast()
        {
            lock (Synchro)
            {
                return lastNode.Item;
            }
        }
    
        /// <summary>
        /// Add item at the end of the buffer. 
        /// If capacity is reached the link to the oldest item is deleted.
        /// </summary>
        public void Add(T item)
        {
            lock (Synchro)
            {
                /* create node and set to last one */
                var node = new LinkedRingBufferNode<T>(lastNode, item);
                lastNode = node;
                /* if it is the first node, the created is also the first */
                if (firstNode == null)
                    firstNode = node;
                /* check for capacity reach */
                Count++;
                if (Count > Capacity)
                {
                    /* deleted all links to the current first so that its eventually gc collected */
                    Count = Capacity;
                    firstNode = firstNode.NextNode;
                    firstNode.PreviousNode = null;
                }
            }
        }
    
        /// <summary>
        /// Iterate through the buffer from the oldest to the newest item
        /// </summary>
        public IEnumerable<T> LastToFirst()
        {
            lock (Synchro)
            {
                var materialized = LastToFirstInner().ToList();
                return materialized;
            }
        }
    
        private IEnumerable<T> LastToFirstInner()
        {
            var current = lastNode;
            while (current != null)
            {
                yield return current.Item;
                current = current.PreviousNode;
            }
        }
    
        /// <summary>
        /// Iterate through the buffer from the newest to the oldest item
        /// </summary>
        public IEnumerable<T> FirstToLast()
        {
            lock (Synchro)
            {
                var materialized = FirstToLastInner().ToList();
                return materialized;
            }
        }
    
        private IEnumerable<T> FirstToLastInner()
        {
            var current = firstNode;
            while (current != null)
            {
                yield return current.Item;
                current = current.NextNode;
            }
        }
    
        /// <summary>
        /// Iterate through the buffer from the oldest to given item. 
        /// If item doesn't exist it iterates until it reaches the newest
        /// </summary>
        public IEnumerable<T> LastToReference(T item)
        {
            lock (Synchro)
            {
                var materialized = LastToReferenceInner(item).ToList();
                return materialized;
            }
        }
    
        private IEnumerable<T> LastToReferenceInner(T item)
        {
            var current = lastNode;
            while (current != null)
            {
                yield return current.Item;
                if (current.Item.Equals(item))
                    break;
                current = current.PreviousNode;
            }
        }
    
        /// <summary>
        /// Iterate through the buffer from the newest to given item. 
        /// If item doesn't exist it iterates until it reaches the oldest
        /// </summary>
        public IEnumerable<T> FirstToReference(T item)
        {
            lock (Synchro)
            {
                var materialized = FirstToReferenceInner(item).ToList();
                return materialized;
            }
        }
    
        private IEnumerable<T> FirstToReferenceInner(T item)
        {
            var current = firstNode;
            while (current != null)
            {
                yield return current.Item;
                if (current.Item.Equals(item))
                    break;
                current = current.PreviousNode;
            }
        }
    
        /// <summary>
        /// Represents a linked node inside the buffer and holds the data
        /// </summary>
        private class LinkedRingBufferNode<A>
        {
            public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
            {
                Item = item;
                NextNode = null;
                PreviousNode = previousNode;
                if (previousNode != null)
                    previousNode.NextNode = this;
            }
            internal A Item { get; }
            internal LinkedRingBufferNode<A> PreviousNode { get; set; }
            internal LinkedRingBufferNode<A> NextNode { get; private set; }
        }
    }
    

    可以进行一些优化,例如您不需要在临界区中创建 LinkedRingBufferNode 对象,但是您必须将 lastNode 值复制到临界区内的局部变量中,之前创建对象。

    【讨论】:

    • 感谢您的快速回复!因此,当使用其中一个迭代器时,这种方法 ToList 调用将创建一个全新的列表(同步),之后可以在 foreach 中进行迭代,对吗?还是锁阻塞了整个foreach迭代过程?我还担心 ToList() 调用会对包含数千个项目的缓冲区产生巨大的性能影响,但我明天将运行一个基准测试。也许只是忘记 foreach 并使用锁定的 GetPrevious()/GetNext() 调用手动搜索列表会更快。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-06-25
    • 1970-01-01
    • 1970-01-01
    • 2010-10-01
    • 1970-01-01
    • 2013-05-30
    • 1970-01-01
    相关资源
    最近更新 更多