【问题标题】:Creating a message bus with TPL Dataflow使用 TPL 数据流创建消息总线
【发布时间】:2012-12-31 01:49:45
【问题描述】:

我一直在寻找一种轻量级、进程中的异步消息总线,结果遇到了 TPL 数据流。

我当前的实现如下(完整示例在https://gist.github.com/4416655)。

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

我有几个关于在消息传递场景中使用 TPL 数据流的一般性问题。

  • BroadcastBlock&lt;T&gt; 是同时向多个处理程序发送消息的推荐来源吗?这是我根据this post得出的结论。
  • 在我的实现中,我对所有消息类型都使用了一个BroadcastBlock&lt;T&gt; 实例。在处理大量消息时这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗?
  • BroadcastBlock&lt;T&gt; 始终存储最后发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。是否可以更改此行为(新订阅应该只接收 new 消息)。
  • 在我的测试应用程序中,我在第一个处理程序中引入了延迟:

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    在发送消息时,我希望看到每条消息一一输出到控制台,以 2s 为增量。相反,在 2 秒后,所有消息都立即输出。我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置 MaxDegreeOfParallelism = 1 没有区别)。

  • 最后,虽然SendAsync 允许我等待消息的发送,但它不允许我等待目标完成(ActionBlock&lt;T&gt;)。我认为这就是PropagateCompletion 会做的事情,但似乎并非如此。理想情况下,我想知道消息的所有处理程序何时都已执行。

更新

Task.Delay 没有得到预期行为的原因是,这延迟了 每个 处理程序的执行,而不是 所有 处理程序的处理。 Thread.Sleep 是我需要的。

【问题讨论】:

    标签: .net task-parallel-library tpl-dataflow


    【解决方案1】:

    在回答了您的问题(见下文)后,我意识到使用 TPL Dataflow 模块对您的设计进行建模可能不是一个好主意。 TDF 有利于通过很大程度上独立的块来处理消息,而没有内置的方法来跟踪单个消息。但这似乎是您想要的:由处理程序按顺序处理消息,并跟踪每条消息的完成情况。

    因此,我认为您不应该创建一个完整的数据流网络,而应使用单个 ActionBlock 作为异步消息处理器:

    public class Bus
    {
        class Subscription
        {
            public Guid Id { get; private set; }
            public Func<object, Task> HandlerAction { get; private set; }
    
            public Subscription(Guid id, Func<object, Task> handlerAction)
            {
                Id = id;
                HandlerAction = handlerAction;
            }
        }
    
        private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
        private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
    
        private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
    
        public Bus()
        {
            // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
            var subscriptions = new List<Subscription>();
    
            m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
                async tuple =>
                {
                    var message = tuple.Item1;
                    var completedAction = tuple.Item2;
    
                    // could be made more efficient, probably doesn't matter
                    Guid idToUnsubscribe;
                    while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                    {
                        subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                    }
    
                    Subscription handlerToSubscribe;
                    while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                    {
                        subscriptions.Add(handlerToSubscribe);
                    }
    
                    foreach (var subscription in subscriptions)
                    {
                        await subscription.HandlerAction(message);
                    }
    
                    completedAction();
                });
        }
    
        public Task SendAsync<TMessage>(TMessage message)
        {
            var tcs = new TaskCompletionSource<bool>();
            Action completedAction = () => tcs.SetResult(true);
    
            m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
    
            return tcs.Task;
        }
    
        public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
        {
            return Subscribe<TMessage>(
                message =>
                {
                    handlerAction(message);
                    // we need a completed non-generic Task; this is a simple, efficient way to get it
                    // another option would be to use async lambda with no await,
                    // but that's less efficient and produces a warning
                    return Task.FromResult(false);
                });
        }
    
        public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
        {
            Func<object, Task> actionWithCheck = async message =>
            {
                if (message is TMessage)
                    await handlerAction((TMessage)message);
            };
    
            var id = Guid.NewGuid();
            m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
            return id;
        }
    
        public void Unsubscribe(Guid subscriptionId)
        {
            m_idsToUnsubscribe.Enqueue(subscriptionId);
        }
    }
    

    (我决定使用队列来订阅和取消订阅,以便处理消息时处理程序列表不会改变。)

    回答您的问题

    BroadcastBlock&lt;T&gt; 是同时向多个处理程序发送消息的推荐来源吗?

    是的,乍一看,BroadcastBlock&lt;T&gt; 就是你想要的。在 TPL 数据流中肯定没有任何类似的块。

    在我的实现中,我对所有消息类型使用单个 BroadcastBlock 实例。在处理大量消息时这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗?

    对于所有消息类型的单个块,您可以在单个线程上完成更多工作(发送到所有处理程序)。对于每种消息类型使用一个块,您将做更少的工作(仅发送到正确的处理程序),这些工作可以在多个线程上执行。因此,我认为假设后者会更快是合理的。

    但不要忘记应用程序性能优化的规则:首先,编写简单易读的代码。仅当事实证明它实际上很慢时,才尝试对其进行优化。在比较两种备选方案时,始终使用分析来确定哪一种实际上更快,不要只是猜测哪一种应该更快。

    BroadcastBlock&lt;T&gt; 始终存储最后发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。是否可以改变这种行为(新订阅应该只接收 new 消息)?

    不,没有办法配置 BroadcastBlock&lt;T&gt; 来做到这一点。如果您不需要BroadcastBlock&lt;T&gt; 的所有功能(发送到容量有限的块,可能暂时已满,支持非贪婪块作为目标),您可能需要编写BroadcastBlock&lt;T&gt; 的自定义版本来执行此操作.

    当发送一条消息时,我希望看到每条消息一条一条地输出到控制台,以 2s 为增量。相反,在 2 秒后,所有消息都立即输出。我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置 MaxDegreeOfParallelism = 1 没有区别)。

    TDF的一个要点是每个block都是独立的,所以多个block可以在多个线程上执行。如果这不是您想要的,那么可能为每个处理程序使用单独的 ActionBlock&lt;T&gt; 可能不是最好的解决方案。事实上,TDF 可能根本不是最好的解决方案。

    另外,Subscribe() 接受 Action&lt;TMessage&gt;,这意味着您的 lambda 将被编译为 async void 方法。这些应该只在特定(且相对罕见)的情况下使用,在这种情况下您没有其他选择。如果你想支持async 处理程序,你应该接受async Task 方法,即Func&lt;TMessage, Task&gt;

    Task.Delay 没有得到预期行为的原因是,这延迟了 每个 处理程序的执行,而不是 所有 处理程序的处理。 Thread.Sleep 是我需要的。

    使用Thread.Sleep() 违背了异步的整个理念,如果可能的话,你不应该使用它。另外,我认为它实际上并没有按照您想要的方式工作:它为每个线程引入了延迟,但 TPL 数据流将使用多个线程,因此这不会像您预期的那样运行。

    最后,虽然SendAsync 允许我等待消息的发送,但它不允许我等待目标的完成(ActionBlock&lt;T&gt;)。我认为这就是PropagateCompletion 会做的事情,但似乎并非如此。理想情况下,我想知道消息的所有处理程序何时执行。

    PropagateCompletionComplete()Completion 一起用于处理整个块的完成,而不是处理单个消息。原因之一是更复杂的数据流网络,可能不清楚何时处理消息。例如,如果一条消息已经发送到 BroadcastBlock&lt;T&gt; 的所有当前目标,但也会发送到所有新添加的目标,是否应该认为它已完成?

    如果您想这样做,则必须以某种方式手动进行,可能使用TaskCompletionSource

    【讨论】:

    • 很好的解决方案。我特别喜欢订阅/取消订阅队列背后的想法。一个问题,您将如何扩展它以支持传递取消令牌以停止处理程序的执行?
    • 如果您想取消对单个消息的处理,我会将cancellationTokencanceledAction 添加到Tuple(这意味着自定义类可能比Tuple 更好那一点)。您可以在SendAsync() 中设置它们并在subscritpions 循环中使用它们。理想情况下,handlerAction 也应该接受令牌(至少是可选的)。
    • 谢谢。我很好奇,为什么不使用返回任务的TransformBlock。然后我们可以只返回Task.WhenAll(subscriptions) 而不是使用TaskCompletionSource
    • 我不确定我是否理解。 TransformBlock 仍然不会直接返回某些输入的输出,您需要以某种方式接收其结果。你当然可以使用TransformBlock做你想做的事,但我认为它实际上会比使用ActionBlockTaskCompletionSource更复杂。
    • 好的,再次感谢。仅供参考,我已将此代码推送到 GitHub。 github.com/benfoster/Fabrik.SimpleBus
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-24
    • 1970-01-01
    相关资源
    最近更新 更多