【问题标题】:Retrieve IoT Hub Twin from ServiceBus triggered azure function从 ServiceBus 中检索 IoT Hub Twin 触发的 azure 函数
【发布时间】:2018-10-08 11:58:06
【问题描述】:

我们正在将数据从 IoT 设备发送到 Azure IoT Hub,并尝试将某种类型的消息传递给 Azure Function。

目前,我们通过创建 Azure 服务总线端点并在 IoTHub 中创建 消息路由 来实现。它按预期工作,Azure 函数正确接收消息。

现在,我们想从 Azure 函数中的 IoT 中心获取 DeviceId,以及在 Device Twin中定义的 Tags >,我完全不知道该怎么做。

如果我们要使用EventHubTrigger,看起来会很简单,做这样的事情:

public static class Test
{
    [FunctionName("TestQueueTrigger")]
    public static void Run(
        [EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
        EventData message,
        Twin deviceTwin,
        TraceWriter log)
    { ... }
}

但目前还不清楚如何使用服务总线触发器来完成。

此外,我们希望将所有消息(独立于路由)存储到 Azure Data Lake 存储,我对它的工作原理有点迷茫。

【问题讨论】:

  • 您想通过 ServiceBus 触发设备孪生更改事件上的功能吗?所有 Azure IoT Hub 遥测消息都可以独立于 Azure 流分析作业存储到 ADL 存储(无路由、无自定义终结点)。
  • 在从设备传递一些附加值(例如,消息类型)的情况下,您可以使用消息属性集合(用于用户),其他属性集合,例如系统属性正在持有诸如 deviceId 等信息。

标签: c# azure azure-functions azureservicebus azure-iot-hub


【解决方案1】:

Azure IoT Hub 设备到云消息格式描述为here。这种格式没有设备孪生属性。设备孪生存储在云后端,可以根据到特定端点(内置和/或自定义端点)的 iot 集线器路由通知它们的更改。

您的函数“TestQueueTrigger”示例在版本 1 中使用 azure-functions-iothub-extension。扩展输入绑定 Twin 允许使用单独的调用获取设备孪生在扩展内:

deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);

基本上,这个扩展也可以用于ServiceBusTrigger 绑定。 请注意,此扩展只能用于函数版本 1,因此我建议使用例如 REST API Get Twin 调用在函数中获取设备孪生。

更新

以下代码 sn-p 显示了 ServiceBusTrigger 函数和 REST API Get Twin 调用的示例。

run.csx 文件:

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "..\\bin\\Microsoft.Azure.Devices.Shared.dll"
#r "Microsoft.Azure.WebJobs.ServiceBus"
#r "Newtonsoft.Json"


using System;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Web;
using Microsoft.Azure.Devices.Shared;

// reusable proxy
static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));

public static async Task Run(Message queueItem, ILogger log)
{
    // payload
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");

    // device identity Id
    var deviceId = queueItem.UserProperties["iothub-connection-device-id"];

    // get the device twin
    var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
    response.EnsureSuccessStatusCode();
    Twin twin = await response.Content.ReadAsAsync<Twin>();

    log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));

    await Task.CompletedTask;
}


// helpers
class HttpClientHelper
{
    HttpClient client;
    DateTime expiringSaS;
    (string hostname, string keyname, string key) config;

    public HttpClientHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
        SetAuthorizationHeader();         
    }

    public HttpClient Client
    {
        get
        {          
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
               SetAuthorizationHeader();  
            }         
            return client;
        }
    }

    internal void SetAuthorizationHeader()
    {
        lock (client)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1)) 
            {
                string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
                if (client.DefaultRequestHeaders.Contains("Authorization"))
                    client.DefaultRequestHeaders.Remove("Authorization");
                client.DefaultRequestHeaders.Add("Authorization", sasToken);
                expiringSaS = DateTime.UtcNow.AddHours(1);
            }
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
    {
        var expiry = GetExpiry(hours);
        string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
        HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));

        var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
        var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
        if (!string.IsNullOrEmpty(keyName))
            sasToken += $"&skn={keyName}";
        return sasToken;
    }

    internal string GetExpiry(uint hours = 24)
    {
        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
        return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
    }
}

function.json:

{
  "bindings": [
    {
      "name": "queueItem",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "myQueue",
      "connection": "myConnectionString_SERVICEBUS"
    }
  ]
}

【讨论】:

  • 感谢您的帮助,我发现 Azure Functions 和 IoT 中心的文档仍然很一般,理解在 v1、v2 和所有不同的方式之间应该使用什么真的很混乱创建一个 Azure 函数。
  • 如果我想使用 REST API 调用,是否有简单的方法可以在函数中获取所需的身份验证令牌?
  • 关键是,设备孪生不是 iot 遥测消息的一部分,因此我们必须通过 Azure IoT Hub 面向服务的端点查询云后端。 DeviceId 是存储在消息系统属性集合中的消息的一部分。换句话说,iot消息可以由AF触发,但查询必须是输入绑定扩展或函数体中的单独调用。
  • REST API GetTwin调用是个不错的选择,根据DeviceId可以获取设备孪生属性,授权头可以根据以下文档实现docs.microsoft.com/en-us/azure/iot-hub/…
  • 注意,一旦我们有了 sas 授权标头,就会有许多通过 REST API 对面向服务的端点的有用调用,例如:CreateDevice、DeleteDevice、UpdateDeviceTwin、SendDirectMethod 等,几乎所有的除了向设备发送消息(即 AMQP 消息)
猜你喜欢
  • 2022-06-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多