【发布时间】:2016-04-20 02:50:37
【问题描述】:
我正在使用 Azure 服务总线 SubscriptionClient.OnMessage 方法;配置为同时处理最多 5 条消息。
在代码中,我需要等待所有消息完成处理才能继续(正确关闭 Azure 辅助角色)。我该怎么做?
SubscriptionClient.Close() 会阻塞直到所有消息都处理完?
【问题讨论】:
标签: azure azureservicebus
我正在使用 Azure 服务总线 SubscriptionClient.OnMessage 方法;配置为同时处理最多 5 条消息。
在代码中,我需要等待所有消息完成处理才能继续(正确关闭 Azure 辅助角色)。我该怎么做?
SubscriptionClient.Close() 会阻塞直到所有消息都处理完?
【问题讨论】:
标签: azure azureservicebus
在 SubscriptionClient 或 QueueClient 上调用 Close 不会阻塞。据我所知,调用 Close 会立即关闭实体。我使用 Windows Azure SDK 2.0 附带的 Worker Role With Service Bus Queue 项目模板快速进行了测试。我在消息处理操作中添加了一个线程睡眠数秒,然后在角色运行时将其关闭。我看到当消息在它们的线程睡眠中处理时调用了 Close 方法,但它肯定没有等待消息处理完成,角色简单地关闭了。
要优雅地处理此问题,您需要执行与处理任何正在处理消息的工作角色(服务总线、Azure 存储队列或其他任何内容)相同的事情:跟踪正在处理的内容并关闭完成后放下。有几种方法可以解决这个问题,但由于涉及多个线程,所有这些方法都是手动的并且在这种情况下变得混乱。
鉴于 OnMessage 的工作方式,您需要在操作中添加一些内容,以查看角色是否已被告知关闭,如果是,则不进行任何处理。问题是,当执行 OnMessage 操作时,它已经有一条消息。您可能需要放弃消息但不退出 OnMessage 操作,否则如果队列中有消息,它将继续收到消息。您不能简单地放弃消息并让执行离开操作,因为这样系统将收到另一条消息(可能是同一条消息),并且执行此操作的多个线程可能会导致消息获得过多的出列计数并出现死信。此外,您不能在 SubscriptionClient 或 QueueClient 上调用 Close,这将在内部停止接收循环,因为一旦您调用 close,任何未完成的消息处理都会在调用 .Complete、.Abandon 等时引发异常消息,因为消息实体现在已关闭。这意味着您不能轻易阻止传入的消息。
这里的主要问题是因为您正在使用 OnMessage 并通过在 OnMessageOptions 上设置 MaxConcurrentCalls 来设置并发消息处理,这意味着启动和管理线程的代码隐藏在 QueueClient 和 SubscriptionClient 中,而您没有无法控制。您没有办法减少线程数,或单独停止线程等。您需要创建一种方法来将 OnMessage 操作线程置于他们知道系统被告知的状态关闭然后完成他们的消息并且不退出操作,以便他们不会不断地被分配新消息。这意味着您可能还需要将 MessageOptions 设置为不使用自动完成并在 OnMessage 操作中手动调用完成。
必须执行所有这些操作可能会严重降低使用 OnMessage 帮助程序的实际好处。在幕后 OnMessage 只是简单地设置一个循环调用接收,默认超时并将消息交给另一个线程来执行操作(松散描述)。因此,通过使用 OnMessage 方法,您不必自己编写该处理程序,但是您遇到的问题是因为您没有自己编写该处理程序,您无法控制这些线程。第 22 条军规。如果你真的需要优雅地停止,你可能想放弃 OnMessage 方法,编写你自己的带有线程的接收循环,并在主循环中停止接收新消息并等待所有工作人员结束。
一个选项,特别是如果消息是幂等的(这意味着多次处理它们会产生相同的结果......无论如何你都应该注意这一点),那么如果它们在处理过程中停止,它们只会重新出现在队列中稍后由另一个实例处理。如果工作本身不是资源密集型并且操作是幂等的,那么这确实是一种选择。与实例可能由于硬件故障或其他问题而失败时没有什么不同。当然,它既不优雅也不优雅,但它确实消除了我提到的所有复杂性,并且由于其他故障仍然可能发生。
请注意,当实例被告知关闭时会调用 OnStop。您有 5 分钟的时间可以延迟此操作,直到结构将其关闭,因此,如果您的消息处理时间超过 5 分钟,那么您是否尝试正常关闭并不重要,有些会被切断在处理过程中。
【讨论】:
您可以调整 OnMessageAsync 以等待消息处理完成,并阻止新消息开始处理:
这里是实现:
_subscriptionClient.OnMessageAsync(async message =>
{
if (_stopRequested)
{
// Block processing of new messages. We want to wait for old messages to complete and exit.
await Task.Delay(_waitForExecutionCompletionTimeout);
}
else
{
try
{
// Track executing messages
_activeTaskCollection[message.MessageId] = message;
await messageHandler(message);
await message.CompleteAsync();
}
catch (Exception e)
{
// handle error by disposing or doing nothing to force a retry
}
finally
{
BrokeredMessage savedMessage;
if (!_activeTaskCollection.TryRemove(message.MessageId, out savedMessage))
{
_logger.LogWarning("Attempt to remove message id {0} failed.", savedMessage.MessageId);
}
}
}
}, onMessageOptions);
还有一个等待完成的 Stop 实现:
public async Task Stop()
{
_stopRequested = true;
DateTime startWaitTime = DateTime.UtcNow;
while (DateTime.UtcNow - startWaitTime < _waitForExecutionCompletionTimeout && _activeTaskCollection.Count > 0)
{
await Task.Delay(_waitForExecutionCompletionSleepBetweenIterations);
}
await _subscriptionClient.CloseAsync();
}
请注意,_activeTaskCollection 是一个 ConcurrentDictionary(我们也可以使用带互锁的计数器来计算正在进行的消息的数量,但使用字典可以让您轻松调查发生错误时发生的情况。
【讨论】:
if (_messagePumpStoppingSource != null) { _log.Warning("Listener is closing. Deferring message..."); await _messagePumpStoppingSource.Task; return; } 一旦所有处理程序都在 Stop 方法中发出信号,我就有 _countdownEvent.Wait(TimeSpan.FromSeconds(30)); await Task.WhenAll(_clients.Select(x => x.CloseAsync())); _messagePumpStoppingSource.SetResult(true);