【问题标题】:Locking Performance & Alternatives锁定性能和替代方案
【发布时间】:2016-02-26 14:33:48
【问题描述】:

我有一个类,它非常频繁地将数据从多个线程推送到订阅的侦听器。如您所见,为了使类线程安全,我锁定了一个专用对象实例。问题是,在迭代列表时锁定会使类的吞吐量降低近 30%,而一个简单的订阅者会计算已发布值的吞吐量。

是否有另一种方法可以使该进程成为线程安全且开销可能更少?任何想法将不胜感激。

public abstract class Publisher<T> : IPublisher<T>
{
    protected readonly object InstanceLock = new object();
    protected readonly List<ISubscriber<T>> Subscribers = new List<ISubscriber<T>>();

    public virtual void Subscribe(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            if (!Subscribers.Contains(subscriber))
            {
                Subscribers.Add(subscriber);
            }
        }
    }
    public virtual void Unsubscribe(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            if (Subscribers.Contains(subscriber))
            {
                Subscribers.Remove(subscriber);
            }
        }
    }
    public virtual bool IsSubscribed(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            return Subscribers.Contains(subscriber);
        }
    }
    public virtual void Publish(T value)
    {
        lock (InstanceLock)
        {
            for (int i = 0; i < Subscribers.Count; i++) Subscribers[i].Record(value);
        }
    }
}

这是我用来衡量吞吐量的订阅者

public sealed class CountRateSubscriber : StatSubscriber<double>
{
    private DateTime _firstUpdated;
    private DateTime _lastUpdated;
    private int _count;

    public override void Record(double value)
    {
        lock (InstanceLock)
        {
            _count++;
            _lastUpdated = DateTime.UtcNow;

            if (_count == 1)
            {
                _firstUpdated = _lastUpdated;
            }
            else
            {
                _value = _count / (_lastUpdated - _firstUpdated).TotalSeconds;
            }

        }
    }
}

public abstract class StatSubscriber<T> : ValueSubscriber<double, T>
{
}

public abstract class ValueSubscriber<TIn, TOut> : ISubscriber<TIn>
{
    protected readonly object InstanceLock = new object();

    protected TOut _value;

    public TOut Value
    {
        get
        {
            lock (InstanceLock)
            {
                return _value;
            }
        }
    }

    public abstract void Record(TIn value);
}

编辑:所以我尝试了所有以下方法,它们都比仅使用 lock 慢:

  • Subscribers 复制到局部变量并且不锁定
  • ReaderWriterLockSlim
  • ConcurrentDictionary
  • -

【问题讨论】:

  • 也许你可以使用一个固定长度的数组作为订阅者缓冲区,然后使用互锁的方法,这样你就永远不会锁定缓冲区。
  • @Gusman 这可能是一个好主意,当数组的大小应该增加时可能应该有一些锁(即使用互锁将较小的数组交换为具有更多空闲插槽的其他数组.. .)
  • 好吧,我说的是“定长数组”,而不是“变长数组”,如果你需要增加缓冲区大小,那么肯定会有锁定。
  • 假设可以做些什么。但真正的问题是“您的 订阅者 线程安全吗”?即同时调用Record 是否安全。
  • @IvanStoev 是的,我已经为订阅者添加了代码,用于测量我的测试的吞吐量。

标签: c# multithreading performance locking publish-subscribe


【解决方案1】:

如果 Publish 方法是您的瓶颈,您可以以订阅中的一些额外开销为代价来锁定 Publish。基本上我们在订阅时复制列表,然后用副本替换列表。如果您关心订阅/取消订阅/发布的顺序,则存在竞争问题(如果发布尚未观察到列表更改,您可能会在调用取消订阅后收到记录调用),但这可能没问题。

public abstract class Publisher<T> : IPublisher<T>
{
    protected readonly object InstanceLock = new object();
    protected List<ISubscriber<T>> Subscribers = new List<ISubscriber<T>>();

    public virtual void Subscribe(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            var newSubscribers = Subscribers.ToList();
            if (!newSubscribers.Contains(subscriber))
            {
                newSubscribers.Add(subscriber);
                Subscribers = newSubscribers;
            }
        }
    }
    public virtual void Unsubscribe(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            var newSubscribers = Subscribers.ToList();
            if (newSubscribers.Contains(subscriber))
            {
                newSubscribers.Remove(subscriber);
                Subscribers = newSubscribers;
            }
        }
    }
    public virtual bool IsSubscribed(ISubscriber<T> subscriber)
    {
        lock (InstanceLock)
        {
            return Subscribers.Contains(subscriber);
        }
    }
    public virtual void Publish(T value)
    {
        var subscribers = Subscribers;
        for (int i = 0; i < subscribers.Count; i++) subscribers[i].Record(value);
    }
}

【讨论】:

  • 嗨,丹,感谢您的 cmets。不幸的是,我已经尝试在迭代之前将订阅者列表复制到局部变量,仅此一项就比锁定方法慢。我开始认为我对此无能为力。
  • @Ashigore,我很惊讶复制到局部变量会更慢。请注意,我们不是在此处复制列表本身,而只是引用。
  • 啊,当然,抱歉,我测试了创建列表的副本。我会试一试。谢谢。
【解决方案2】:

如果没有更多的范围,很难说出什么是可能的。所以我不得不猜测。
订阅者何时使用数据?是推送给他们还是他们在某个地方有自己的处理循环?

让作者一直接触到每一个读者是许多邪恶的根源,你不能指望它随着读者的数量而很好地扩展。让读者写任何东西(即使它“只是”一个锁)使他们成为作家。
代替 W 写入和 R 读取,您有 W+R 和 R 读取......很多缓存线乒乓......

如果我猜对了,并且读者有自己的处理线程,那么您可以共享来自发布者的数据,而不是将其推送给每个发布者。

您可以使用 seqlock - 写入器会通过旋转锁定直到 seq 为偶数,然后自动将 1 添加到 seq(可能在这里失败 - 重新启动进程),更新数据,最后通过添加另一个 1 来解锁序列,让它再次变得更平。
读取器线程将旋转直到 seq 为偶数,复制数据,检查 seq 是否为相同的偶数。如果没有,请再次旋转....

这将使读者以最快的方式获得最后一个值,因为他们没有任何东西可以锁定。它们是完全只读的。旧值丢失。
作家也将获得最快的路径。他们必须互相竞争,没有办法,但至少他们不必与读者竞争。

【讨论】:

    【解决方案3】:

    我们的想法是在发布数据时不要分发数据。

    让它成为一个多步骤的过程。

    发布 -> 将数据放在 CENTRAL 列表中。分发线程从该列表中读取数据,然后将数据发送给所有订阅者。

    随着订阅者的增多,这可能会成为瓶颈。如果是这种情况,请添加另一个步骤。发布 -> 主列表 -> 子列表(例如 1000 个订阅者)-> 最终分发

    这样可以避免过度锁定,更重要的是可以保持数据源的快速。

    现在,订阅者列表。忘记列表。使用数组数组。顺便说一句,子数组将是上面的子线程;)诀窍是-您锁定以进行添加和删除,但您不锁定以进行分发。添加或删除订阅者时,您分配一个新数组并将信息复制到其中。然后你 RPLACE 用于分发的数组。锁定以确保正确处理内存屏障。

    结果是订阅者数组永远不会发生变化 - 因为新列表替换了旧列表。

    这 - 显然 - 只有在 GC 和数组分配不是重要的时间消耗者时才有效。通常是这样的。

    我将它用于高度可扩展的交易应用程序。市场和交易更新由此分发,我永远不必因为添加/删除而锁定数据分发等待。

    【讨论】:

    • IMO 这意味着更多的读者和作者参与其中,因此会进一步减慢出版速度。问题不在于订阅者列表的变化,而是在不同的线程上有多个作者和多个读者。
    • 其实没有。首先,它确保发布者快速完成,无论有多少订阅者。这很关键。其次,它允许现代的非单核处理器使用多个线程来分发数据——其他任何事情都是愚蠢的。第三,它完全避免了对出版的真正锁定。我使用自旋锁来同步多个写入器,但除非分发列表更改,否则我会取消全锁。那里有成本 - 但在我的世界(以及大多数进行数据分发的人),这是一个罕见的事件(与数据分发相比)。
    • 诀窍始终是:(a) 使锁尽可能短,(b) 确实需要尽可能少的锁。我的解决方案实现了两者。是的,你可以投反对票——但除非你能想出一个更好的方案,否则这并不能说明你的智慧。我的目标是在一台体面的机器(今天是 16 个核心)上拥有 1000 多个活跃订阅者,延迟低于毫秒级,我无意不使用这些核心。数据方面,我可以处理数以万计的数据——但我的消费者目前住在同一台机器上,所以 CPU 使用率会上升一点。
    • 我认为这很有趣,你所说的,我会研究测试这样的实现,看看会发生什么,谢谢。
    • @TomTom,我绝对同意您必须使用所有内核。你添加给作者的核心是你从读者那里拿走的。谁说哪个更重要?
    猜你喜欢
    • 2018-11-09
    • 2011-12-07
    • 2011-05-24
    • 2019-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多