【发布时间】:2016-02-26 14:33:48
【问题描述】:
我有一个类,它非常频繁地将数据从多个线程推送到订阅的侦听器。如您所见,为了使类线程安全,我锁定了一个专用对象实例。问题是,在迭代列表时锁定会使类的吞吐量降低近 30%,而一个简单的订阅者会计算已发布值的吞吐量。
是否有另一种方法可以使该进程成为线程安全且开销可能更少?任何想法将不胜感激。
public abstract class Publisher<T> : IPublisher<T>
{
protected readonly object InstanceLock = new object();
protected readonly List<ISubscriber<T>> Subscribers = new List<ISubscriber<T>>();
public virtual void Subscribe(ISubscriber<T> subscriber)
{
lock (InstanceLock)
{
if (!Subscribers.Contains(subscriber))
{
Subscribers.Add(subscriber);
}
}
}
public virtual void Unsubscribe(ISubscriber<T> subscriber)
{
lock (InstanceLock)
{
if (Subscribers.Contains(subscriber))
{
Subscribers.Remove(subscriber);
}
}
}
public virtual bool IsSubscribed(ISubscriber<T> subscriber)
{
lock (InstanceLock)
{
return Subscribers.Contains(subscriber);
}
}
public virtual void Publish(T value)
{
lock (InstanceLock)
{
for (int i = 0; i < Subscribers.Count; i++) Subscribers[i].Record(value);
}
}
}
这是我用来衡量吞吐量的订阅者
public sealed class CountRateSubscriber : StatSubscriber<double>
{
private DateTime _firstUpdated;
private DateTime _lastUpdated;
private int _count;
public override void Record(double value)
{
lock (InstanceLock)
{
_count++;
_lastUpdated = DateTime.UtcNow;
if (_count == 1)
{
_firstUpdated = _lastUpdated;
}
else
{
_value = _count / (_lastUpdated - _firstUpdated).TotalSeconds;
}
}
}
}
public abstract class StatSubscriber<T> : ValueSubscriber<double, T>
{
}
public abstract class ValueSubscriber<TIn, TOut> : ISubscriber<TIn>
{
protected readonly object InstanceLock = new object();
protected TOut _value;
public TOut Value
{
get
{
lock (InstanceLock)
{
return _value;
}
}
}
public abstract void Record(TIn value);
}
编辑:所以我尝试了所有以下方法,它们都比仅使用 lock 慢:
- 将
Subscribers复制到局部变量并且不锁定 ReaderWriterLockSlimConcurrentDictionary
-
【问题讨论】:
-
也许你可以使用一个固定长度的数组作为订阅者缓冲区,然后使用互锁的方法,这样你就永远不会锁定缓冲区。
-
@Gusman 这可能是一个好主意,当数组的大小应该增加时可能应该有一些锁(即使用互锁将较小的数组交换为具有更多空闲插槽的其他数组.. .)
-
好吧,我说的是“定长数组”,而不是“变长数组”,如果你需要增加缓冲区大小,那么肯定会有锁定。
-
假设可以做些什么。但真正的问题是“您的 订阅者 线程安全吗”?即同时调用
Record是否安全。 -
@IvanStoev 是的,我已经为订阅者添加了代码,用于测量我的测试的吞吐量。
标签: c# multithreading performance locking publish-subscribe