【问题标题】:Calling async callback from ConcurrentDictionary - potential deadlock?从 ConcurrentDictionary 调用异步回调 - 潜在的死锁?
【发布时间】:2020-11-28 03:13:08
【问题描述】:

我有一个并发字典,用于管理订阅(以及与其他代码的链接),为了这个示例,显示了一个简化版本。

我需要在添加或删除主题时更新外部消息总线,而不是添加额外的同步机制并围绕字典添加和消息主题添加订阅包装一个阻塞部分,我正在使用并发字典并回调成功添加项目时的操作:

class SubscriptionManager
{
  readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new ConcurrentDictionary<string, SubscribedTypes>();

   public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
   {
      subscriptions
          .AddOrUpdate(
                    topic,
                    (topic) =>
                    {
                        var subscribedTypes = new SubscribedTypes(qos);
                        /* do some work and checks */

                        dynamicCallback(topic); // call back the calling code

                        return subscribedTypes;
                    },
                    (topic, subscribedTypes) =>
                    {
                        /* snip */
                        subscribedTypes.Add(type);
                        return subscribedTypes;
                    });
   }
}

我的消息总线是纯异步的,客户端会调用如下:

subscriptionManager.AddDynamic("v1/health", HealthMessage, async (topic) => await messageBus.Subscribe(topic));

对原始问题的更新

匿名 lambda 甚至被称为异步函数调用吗? 回答 - 这不是一个好主意。消息总线可以被释放,所以要么通过单例的 IoC 生命周期管理来保证,要么在消息总线中实现 SubscriptionManager 实例

性能在这里至关重要,因为在审核时,某些应用会动态创建大量订阅,并且会不断阅读字典。

因此,我设置了一个 ReaderWriterLockSlim 锁,以最大限度地减少读取锁定(升级为写入完全锁定),并且当需要订阅消息总线时,它在临界区之外完成。

public class SubscriptionManager
{
    // SubscribedTypes is a dictionary with some additional features
    private readonly Dictionary<string, SubscribedTypes> dict = new Dictionary<string, SubscribedTypes>();
    private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
    private readonly IMessageBus messageBus;
    
    SubscriptionManager(IMessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public async Task AddDynamicAsync(string topic, Type type)
    {
        var subscriptionRequired = false;
        
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                // just add - no need to subscribe, is already subscribed when created for first time
                value.DynamicRegistrations.Add(handlerType);                
            }
            else
            {
                // this topic is new, add it
                var subscribedTypes = new SubscribedTypes(qos);
                subscribedTypes.DynamicRegistrations.Add(handlerType);
                this.dict[topic] = subscribedTypes;

                // we will need to register this on external bus
                subscriptionRequired = true;
            }           
        });
        
        if(subscriptionRequired)
            await this.messageBus.Subscribe(topic);
    }
    
    public async Task RemoveDynamicAsync(string topic, Type type)
    {
        var unsubscribeRequired = false;
    
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                value.DynamicRegistrations.Remove(handlerType);             
                unsubscribeRequired = true;
            }
            else
            {
                throw new InvalidOperationException($"topic '{topic}' not registered.");
            }       
        });
        
        if(unsubscribeRequired)
            await this.messageBus.Unsubscribe(topic);
    }
    
    public IEnumerable<Type> GetTypesForTopic(string topic)
    {
        var found = ReadLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
                return value.GetAll();  
            else
                return new List<Type>();
        });
    }

    private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
    {
        rwLock.EnterReadLock();
        try { return func(); }
        finally { rwLock.ExitReadLock(); }
    }

    private void WriteLock(Action action)
    {
        rwLock.EnterWriteLock();
        try { action(); }
        finally { rwLock.ExitWriteLock(); }
    }               
}

【问题讨论】:

  • 首先,这不是这些参数的用途:docs.microsoft.com/en-us/dotnet/api/…。其次,虽然您不会看到死锁,但您可能会看到更糟糕的问题,因为这实际上是一种即发即弃的操作(并且您的 messageBus 可能会在中途或什至在任务开始之前被丢弃,真的,那里可能发生任何事情)
  • 感谢卡米洛,我确信添加/更新的工厂都可以public TValue AddOrUpdate (TKey key, Func&lt;TKey,TValue&gt; addValueFactory, Func&lt;TKey,TValue,TValue&gt; updateValueFactory); - 它们按预期工作(添加值)。在火上指出并忘记,我可以做些什么来重构它(即使完全写入一个包含消息总线和字典的自包含类)?在这种情况下,messageBus 是一个完全托管的包装器,因此永远不会被释放,它会在内部处理所有事情(包括订阅和启动)。
  • 如果你不想要锁,这对我来说很有意义,你能用一个简单的 if 和 public boolean TryAdd 的返回布尔值吗?它会让你打电话给TryAddTryUpdate。如果性能不重要,简单的锁是 IMO 最简单的方法

标签: c# concurrency async-await


【解决方案1】:

您使用ConcurrentDictionary 的第一种方法是有问题的,因为根据documentation

如果您在不同线程上同时调用AddOrUpdateaddValueFactory 可能会被多次调用,但它的键/值对可能不会在每次调用时都添加到字典中。

[...] addValueFactoryupdateValueFactory 委托在锁外调用,以避免在锁下执行未知代码可能出现的问题。因此,AddOrUpdate 对于ConcurrentDictionary&lt;TKey,TValue&gt; 类上的所有其他操作而言不是原子的。

您使用ConcurrentDictionary 的方式表明您希望每次调用AddOrUpdate 时最多执行一次所提供的lambda,但这不能保证。

您使用普通Dictionary+lock 的第二种方法将无法编译,因为you are not allowed to await inside a lock block。您必须将await messageBus.Subscribe(topic); 移出受保护区域,或使用SemaphoreSlim 之类的异步限制器。

【讨论】:

  • 感谢您的回复,我建议作为问题的更新使用ReaderWriterLockSlim 重写,然后在循环外执行慢速消息总线订阅(正在考虑将这些订阅/取消订阅请求排队在后台工作线程上,但给定 async 看没有任何好处) - 你觉得这更合适吗(随意将代码复制/粘贴到答案中并根据需要进行调整,我不确定是否正确)感谢您的帮助
  • @morleyc 抱歉,您的代码涉及很多,我没有太多时间深入研究它。我只是提出一个想法,考虑使用不可变集合作为订阅的容器。这些集合几乎不需要同步。您可以查看使用它们的示例here
猜你喜欢
  • 1970-01-01
  • 2020-01-12
  • 2015-12-19
  • 2017-02-18
  • 2019-01-13
  • 1970-01-01
  • 1970-01-01
  • 2017-01-05
  • 2014-09-12
相关资源
最近更新 更多