除了 Sean 的回答之外,Azure 服务总线与 AEG 的集成还可以为 ASB 实体构建一些看门狗功能。请注意,此集成与存储 blob 帐户不同,每次创建/删除 blob 时都会发布事件。
换句话说,ASB 不会为到达 ASB 实体的每条消息发布事件,这些事件像实体看门狗一样发布。
这种实体看门狗使用如下逻辑:
- 当实体中没有消息时,不会发布任何事件。
- 当第一条消息到达实体并且实体上 360+ 秒内没有活动侦听器时立即发布事件
- 当侦听器仍处于非活动状态且实体中至少有一条消息时,该事件每 120 秒发布一次
- 事件在 360 秒侦听器空闲(非活动)时间后发布,但实体中仍然至少有一条消息。例如,如果我们在实体中有 5 条消息,并且订阅者将使用 REST Api 仅拉出一条消息,则下一个事件将在 360 秒后发布。换句话说,看门狗实体允许让一个监听器在空闲时间内保持 360 秒。
基于上述“看门狗实体”行为,此功能看起来更适合缓慢流量消息传递,例如唤醒和监视 ASB 实体上的侦听器。
请注意,在订阅级别使用短重试时间策略可以避免侦听器的 360 秒空闲时间,因此可以在 5 分钟重试时间内再次调用订阅者 3 次。
出于测试目的,以下是用于订阅 ASB 事件的 EventGridTrigger 函数的代码 sn-p。
#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;
// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));
public static async Task Run(JObject eventGridEvent, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
// from the eventgrid payload
var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";
using (var client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());
do
{
// read & delete the message
var response = await client.DeleteAsync(requestUri);
// check for message
if (response.StatusCode != HttpStatusCode.OK)
{
log.LogWarning($">>> No message <<<");
break;
}
// message body
string jsontext = await response.Content.ReadAsStringAsync();
// show the message
log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
} while (true);
}
await Task.CompletedTask;
}
// helpers
class SasTokenHelper
{
DateTime expiringSaS;
uint sasTTLInMinutes = 10;
string sasToken = string.Empty;
(string hostname, string keyname, string key) config;
public SasTokenHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
GetSasToken();
}
public string GetSasToken()
{
lock (sasToken)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
}
return sasToken;
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
{
var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
}
}