【发布时间】:2020-11-28 03:13:08
【问题描述】:
我有一个并发字典,用于管理订阅(以及与其他代码的链接),为了这个示例,显示了一个简化版本。
我需要在添加或删除主题时更新外部消息总线,而不是添加额外的同步机制并围绕字典添加和消息主题添加订阅包装一个阻塞部分,我正在使用并发字典并回调成功添加项目时的操作:
class SubscriptionManager
{
readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new ConcurrentDictionary<string, SubscribedTypes>();
public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
{
subscriptions
.AddOrUpdate(
topic,
(topic) =>
{
var subscribedTypes = new SubscribedTypes(qos);
/* do some work and checks */
dynamicCallback(topic); // call back the calling code
return subscribedTypes;
},
(topic, subscribedTypes) =>
{
/* snip */
subscribedTypes.Add(type);
return subscribedTypes;
});
}
}
我的消息总线是纯异步的,客户端会调用如下:
subscriptionManager.AddDynamic("v1/health", HealthMessage, async (topic) => await messageBus.Subscribe(topic));
对原始问题的更新
匿名 lambda 甚至被称为异步函数调用吗? 回答 - 这不是一个好主意。消息总线可以被释放,所以要么通过单例的 IoC 生命周期管理来保证,要么在消息总线中实现 SubscriptionManager 实例
性能在这里至关重要,因为在审核时,某些应用会动态创建大量订阅,并且会不断阅读字典。
因此,我设置了一个 ReaderWriterLockSlim 锁,以最大限度地减少读取锁定(升级为写入完全锁定),并且当需要订阅消息总线时,它在临界区之外完成。
public class SubscriptionManager
{
// SubscribedTypes is a dictionary with some additional features
private readonly Dictionary<string, SubscribedTypes> dict = new Dictionary<string, SubscribedTypes>();
private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private readonly IMessageBus messageBus;
SubscriptionManager(IMessageBus messageBus)
{
this.messageBus = messageBus;
}
public async Task AddDynamicAsync(string topic, Type type)
{
var subscriptionRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
// just add - no need to subscribe, is already subscribed when created for first time
value.DynamicRegistrations.Add(handlerType);
}
else
{
// this topic is new, add it
var subscribedTypes = new SubscribedTypes(qos);
subscribedTypes.DynamicRegistrations.Add(handlerType);
this.dict[topic] = subscribedTypes;
// we will need to register this on external bus
subscriptionRequired = true;
}
});
if(subscriptionRequired)
await this.messageBus.Subscribe(topic);
}
public async Task RemoveDynamicAsync(string topic, Type type)
{
var unsubscribeRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
value.DynamicRegistrations.Remove(handlerType);
unsubscribeRequired = true;
}
else
{
throw new InvalidOperationException($"topic '{topic}' not registered.");
}
});
if(unsubscribeRequired)
await this.messageBus.Unsubscribe(topic);
}
public IEnumerable<Type> GetTypesForTopic(string topic)
{
var found = ReadLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
return value.GetAll();
else
return new List<Type>();
});
}
private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
{
rwLock.EnterReadLock();
try { return func(); }
finally { rwLock.ExitReadLock(); }
}
private void WriteLock(Action action)
{
rwLock.EnterWriteLock();
try { action(); }
finally { rwLock.ExitWriteLock(); }
}
}
【问题讨论】:
-
首先,这不是这些参数的用途:docs.microsoft.com/en-us/dotnet/api/…。其次,虽然您不会看到死锁,但您可能会看到更糟糕的问题,因为这实际上是一种即发即弃的操作(并且您的
messageBus可能会在中途或什至在任务开始之前被丢弃,真的,那里可能发生任何事情) -
感谢卡米洛,我确信添加/更新的工厂都可以
public TValue AddOrUpdate (TKey key, Func<TKey,TValue> addValueFactory, Func<TKey,TValue,TValue> updateValueFactory);- 它们按预期工作(添加值)。在火上指出并忘记,我可以做些什么来重构它(即使完全写入一个包含消息总线和字典的自包含类)?在这种情况下,messageBus 是一个完全托管的包装器,因此永远不会被释放,它会在内部处理所有事情(包括订阅和启动)。 -
如果你不想要锁,这对我来说很有意义,你能用一个简单的 if 和
public boolean TryAdd的返回布尔值吗?它会让你打电话给TryAdd和TryUpdate。如果性能不重要,简单的锁是 IMO 最简单的方法
标签: c# concurrency async-await