【问题标题】:Azure Streaming Analytics stateful aggregationAzure 流分析有状态聚合
【发布时间】:2018-04-11 13:21:05
【问题描述】:

我有一个 IoT 中心,其中包含多个将数据发送到流式分析的设备。来自设备的消息包含有关其健康状况的浮动信息(介于 0 和 1 之间)。流式分析将数据输出到服务总线,我想添加有关包含给定时刻跨设备平均运行状况的字段的信息。

我想使用用户定义的聚合每 10 秒生成一次此值,但看起来它只使用时间范围内的最后一条消息。

我是否正确使用了 UDA,如果没有,是否有任何其他方法可以在多个设备或其他一些有状态函数之间求平均值?

UDA 代码:

function main() {
this.init = function () {
    this.state = {};
}

this.accumulate = function (value, device_id) {
    this.state[device_id] = value;
}

/*this.deaccumulate = function (value, timestamp) {
    this.state -= value;
}

this.deaccumulateState = function (otherState) {
      this.state -= otherState.state;
}*/

this.computeResult = function () {
    length = 0,
    total  = 0;
    for (var device in this.state) {
        total += this.state[device];
        length++;
    }
    return total/length;
}
}

查询:

SELECT
uda.fleetHealth(device_health_status.level, device_id) as avg_health
INTO
    bustopic2
FROM
    iotdata
GROUP BY TumblingWindow(second, 10)

【问题讨论】:

    标签: azure azure-stream-analytics


    【解决方案1】:

    您只能将最后一条消息作为 1 您在 Java 脚本中使用地图。 2 第二个参数始终相同且等于应用程序时间戳,即使您将其定义为 device_id。 如果你想计算所有设备的平均水平,你应该这样做:

    function UDASample() {
        this.init = function () {
            this.state = 0;
            this.length = 0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.state += value;
            this.length = length + 1;
        }
    
        /*this.deaccumulate = function (value, timestamp) {
            this.state -= value;
        }
    
        this.deaccumulateState = function (otherState) {
              this.state -= otherState.state;
        }*/
    
        this.computeResult = function () {
            return this.state/this.length;
        }
    }
    
    SELECT
    uda.fleetHealth(device_health_status.level) as avg_health
    INTO
        bustopic2
    FROM
        iotdata
    GROUP BY TumblingWindow(second, 10)
    

    如果你想统计每个设备的平均水平,你可以使用上面相同的 UDA 并使用这样的脚本:

    SELECT device_id,
    uda.fleetHealth(device_health_status.level) as avg_health
    INTO
        bustopic2
    FROM
        iotdata
    GROUP BY TumblingWindow(second, 10), device_id
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-14
      • 2019-12-30
      • 1970-01-01
      • 2020-08-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多