【问题标题】:EventGridTrigger for Azure Service Bus TopicAzure 服务总线主题的 EventGridTrigger
【发布时间】:2026-02-13 13:25:03
【问题描述】:

我创建了一个基于 EventGrid 触发器的 Azure 函数。只要有新消息到达服务总线主题,就会触发该触发器。下面是生成的函数模板

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static void Run(JObject eventGridEvent, TraceWriter log)
{
    log.Info(eventGridEvent.ToString(Formatting.Indented));
}

我对 Azure 函数的要求是处理数据并将其存储在 ADLS 中。现在我如何解析/反序列化来自 JObject 类型的数据。在将其保存到数据湖存储之前,我需要在此函数中规范化数据。 我需要覆盖该功能吗?

请提供一些详细信息/参考以满足此要求

【问题讨论】:

    标签: azure-functions azure-data-lake azure-servicebus-topics azure-eventgrid


    【解决方案1】:

    Service Bus (Premium) 为两种场景发送事件:

    1. ActiveMessagesWithNoListenersAvailable
    2. DeadletterMessagesAvailable

    当有与特定实体关联的消息并且不存在活动的侦听器时,将发出第一个事件。实体将在有效负载中指示,以及访问它所需的其他信息(例如命名空间或要接收的订阅主题)。架构在documentation 中定义。

    第二个事件模式与第一个类似,是为死信队列发出的。

    现在我如何解析/反序列化来自 JObject 类型的数据。在将其保存到数据湖存储之前,我需要在此函数中规范化数据。我需要覆盖这个函数吗?

    eventGridEvent JSON 本身不会向您提供 Azure 服务总线消息。 您将需要首先知道原始消息是如何序列化的,即发送方使用了什么。该反序列化需要进入函数,然后是编写对象数据湖的代码。

    【讨论】:

      【解决方案2】:

      除了 Sean 的回答之外,Azure 服务总线与 AEG 的集成还可以为 ASB 实体构建一些看门狗功能。请注意,此集成与存储 blob 帐户不同,每次创建/删除 blob 时都会发布事件。 换句话说,ASB 不会为到达 ASB 实体的每条消息发布事件,这些事件像实体看门狗一样发布。

      这种实体看门狗使用如下逻辑:

      1. 当实体中没有消息时,不会发布任何事件。
      2. 当第一条消息到达实体并且实体上 360+ 秒内没有活动侦听器时立即发布事件
      3. 当侦听器仍处于非活动状态且实体中至少有一条消息时,该事件每 120 秒发布一次
      4. 事件在 360 秒侦听器空闲(非活动)时间后发布,但实体中仍然至少有一条消息。例如,如果我们在实体中有 5 条消息,并且订阅者将使用 REST Api 仅拉出一条消息,则下一个事件将在 360 秒后发布。换句话说,看门狗实体允许让一个监听器在空闲时间内保持 360 秒。

      基于上述“看门狗实体”行为,此功能看起来更适合缓慢流量消息传递,例如唤醒和监视 ASB 实体上的侦听器。

      请注意,在订阅级别使用短重试时间策略可以避免侦听器的 360 秒空闲时间,因此可以在 5 分钟重试时间内再次调用订阅者 3 次。

      出于测试目的,以下是用于订阅 ASB 事件的 EventGridTrigger 函数的代码 sn-p。

      #r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
      #r "Newtonsoft.Json"
      
      using System;
      using System.Threading.Tasks;
      using System.Text;
      using System.Linq;
      using System.Net;
      using System.Net.Http;
      using System.Web;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
      using Microsoft.Azure.ServiceBus.Primitives;
      
      
      
      // sasToken cache
      static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));
      
      public static async Task Run(JObject eventGridEvent, ILogger log)
      {
          log.LogInformation(eventGridEvent.ToString());
      
          // from the eventgrid payload
          var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";
      
          using (var client = new HttpClient())
          {
      
              client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());
      
              do
              {
                  // read & delete the message 
                  var response = await client.DeleteAsync(requestUri);
      
                  // check for message
                  if (response.StatusCode != HttpStatusCode.OK)
                  {
                      log.LogWarning($">>> No message <<<");
                      break;
                  }
      
                  // message body
                  string jsontext = await response.Content.ReadAsStringAsync();
      
                  // show the message
                  log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
              } while (true);
      
          }
      
          await Task.CompletedTask;
      }
      
      
      
      
      // helpers
      class SasTokenHelper
      {
          DateTime expiringSaS;
          uint sasTTLInMinutes = 10;
          string sasToken = string.Empty;
          (string hostname, string keyname, string key) config;
      
          public SasTokenHelper(string connectionString)
          {
              config = GetPartsFromConnectionString(connectionString);
              GetSasToken();
          }
      
          public string GetSasToken()
          {
              lock (sasToken)
              {
                  if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
                  {
                      this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
                      expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
                  }
                  return sasToken;
              }
          }
      
          internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
          {
              var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
              return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
          }
      
          internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
          {
              var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
              return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
          }
      }
      

      【讨论】: