这种特殊情况有点进退两难,因为——正如您正确观察到的——SetNumberOfWorkers 是一个阻塞函数,它会一直等到达到所需的线程数。
在您的情况下,由于您将其设置为零,这意味着您的消息处理程序需要在线程数达到零之前完成......然后:?☠?
我很抱歉这么说,因为我敢打赌,您之所以愿意这样做是因为您不知何故陷入了困境——但总的来说,我必须说,想要按消息队列顺序处理消息是在乞求麻烦,因为有很多事情会导致消息被重新排序。
但是,我认为你可以通过安装一个传输装饰器来解决你的问题,它会在切换时绕过真正的传输。如果装饰器随后从 Receive 方法返回 null,它将触发 Rebus 的内置退避策略并开始冷却(即,它将增加轮询传输之间的等待时间)。
检查一下——首先,让我们创建一个简单的线程安全切换:
public class MessageHandlingToggle
{
public volatile bool ProcessMessages = true;
}
(您可能想以某种方式结束并制作漂亮,但现在应该这样做)
然后我们将它注册为容器中的单例(假设这里是 Microsoft DI):
services.AddSingleton(new MessageHandlingToggle());
我们将使用ProcessMessages 标志来指示是否应启用消息处理。
现在,当您配置 Rebus 时,您可以装饰传输并让装饰器访问容器中的切换实例:
services.AddRebus((configure, provider) =>
configure
.Transport(t => {
t.Use(...);
// install transport decorator here
t.Decorate(c => {
var transport = c.Get<ITransport>();
var toggle = provider.GetRequiredService<MessageHandlingToggle>();
return new MessageHandlingToggleTransportDecorator(transport, toggle);
})
})
.(...)
);
所以,现在您只需要构建装饰器:
public class MessageHandlingToggleTransportDecorator : ITransport
{
static readonly Task<TransportMessage> NoMessage = Task.FromResult(null);
readonly ITransport _transport;
readonly MessageHandlingToggle _toggle;
public MessageHandlingToggleTransportDecorator(ITransport transport, MessageHandlingToggle toggle)
{
_transport = transport;
_toggle = toggle;
}
public string Address => _transport.Address;
public void CreateQueue(string address) => _transport.CreateQueue(address);
public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
=> _transport.Send(destinationAddress, message, context);
public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
=> _toggle.ProcessMessages
? _transport.Receive(context, cancellationToken)
: NoMessage;
}
如您所见,当ProcessMessages == false 时,它只会返回null。剩下的就是决定何时再次恢复处理消息,以某种方式从容器中拉出MessageHandlingToggle(可能是通过注入它),然后将bool 弹回true。
我希望对你有用,或者至少能给你一些关于如何解决问题的灵感。 ?