【发布时间】:2012-12-31 01:49:45
【问题描述】:
我一直在寻找一种轻量级、进程中的异步消息总线,结果遇到了 TPL 数据流。
我当前的实现如下(完整示例在https://gist.github.com/4416655)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
我有几个关于在消息传递场景中使用 TPL 数据流的一般性问题。
-
BroadcastBlock<T>是同时向多个处理程序发送消息的推荐来源吗?这是我根据this post得出的结论。 - 在我的实现中,我对所有消息类型都使用了一个
BroadcastBlock<T>实例。在处理大量消息时这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗? -
BroadcastBlock<T>始终存储最后发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。是否可以更改此行为(新订阅应该只接收 new 消息)。 -
在我的测试应用程序中,我在第一个处理程序中引入了延迟:
// Subscribe to Message type var subscription1 = bus.Subscribe<Message>(async m => { await Task.Delay(2000); Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); });在发送消息时,我希望看到每条消息一一输出到控制台,以 2s 为增量。相反,在 2 秒后,所有消息都立即输出。我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置MaxDegreeOfParallelism = 1没有区别)。 最后,虽然
SendAsync允许我等待消息的发送,但它不允许我等待目标完成(ActionBlock<T>)。我认为这就是PropagateCompletion会做的事情,但似乎并非如此。理想情况下,我想知道消息的所有处理程序何时都已执行。
更新
Task.Delay 没有得到预期行为的原因是,这延迟了 每个 处理程序的执行,而不是 所有 处理程序的处理。 Thread.Sleep 是我需要的。
【问题讨论】:
标签: .net task-parallel-library tpl-dataflow