【问题标题】:Calculate Device seconds On using Kinesis Analytics使用 Kinesis Analytics 计算设备秒数
【发布时间】:2018-03-22 12:36:12
【问题描述】:

我正在尝试使用 Kinesis 分析并解决了许多问题,但实际上坚持以下几点:

我实际上有一个流,其中包含反映设备何时打开和关闭的记录,例如:

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • 我在 reading 字段中使用 1 表示开启,使用 0 表示关闭。

我想要完成的是创建一个 PUMP,它将设备每 5 分钟窗口上的秒数重定向到另一个流,如下所示:

device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

不确定这是否可以通过 Kinesis Analytics 完成,我实际上可以通过查询 SQL 表来完成它,但我被流数据这一事实所困扰。

【问题讨论】:

    标签: amazon-web-services analytics amazon-kinesis amazon-kinesis-firehose


    【解决方案1】:

    Drools Kinesis Analytics(亚马逊上的托管服务)可以做到这一点:

    类型:

    package com.text;
    
    import java.util.Deque;
    
    declare EventA
        @role( event )
        id: int;
        timestamp: long;
        on: boolean;
    
        //not part of the message
        seen: boolean;
    end
    
    declare Session
        id: int @key;
        events: Deque;
    end
    
    declare Report
        id: int @key;
        timestamp: long @key;
        onInLast5Mins: int;
    end
    

    规则:

    package com.text;
    
    import java.util.Deque;
    import java.util.ArrayDeque;
    
    declare enum Constants
    
        // 20 seconds - faster to test
        WINDOW_SIZE(20*1000);
    
        value: int;
    end
    
    rule "Reporter"
        // 20 seconds - faster to test
        timer(cron:0/20 * * ? * * *)
    when
        $s: Session()
    then
        long now = System.currentTimeMillis();
    
        int on = 0; //how long was on
        int off = 0; //how long was off
        int toPersist = 0; //last interesting event
    
        for (EventA a : (Deque<EventA>)$s.getEvents()) {
            toPersist ++;
            boolean stop = false;
            // time elapsed since the reading till now
            int delta = (int)(now - a.getTimestamp());
            if (delta >= Constants.WINDOW_SIZE.getValue()) {
                delta = Constants.WINDOW_SIZE.getValue();
                stop = true;
            }
    
            // remove time already counted
            delta -= (on+off);
            if (a.isOn())
                on += delta;
            else
                off += delta;
    
            if (stop)
                break;
        }
    
        int toRemove = $s.getEvents().size() - toPersist;
        while (toRemove > 0) {
            // this event is out of window of interest - delete
            delete($s.getEvents().removeLast());
            toRemove --;
        }
    
        insertLogical(new Report($s.getId(), now, on));
    end
    
    rule "SessionCreate"
    when
        // for every new EventA
        EventA(!seen, $id: id) from entry-point events
        // check there is no session
        not (exists(Session(id == $id)))
    then
        insert(new Session($id, new ArrayDeque()));
    end
    
    rule "SessionJoin"
    when
        // for every new EventA
        $a : EventA(!seen) from entry-point events
        // get event's session
        $g: Session(id == $a.id)
    then
        $g.getEvents().push($a);
        modify($a) {
            setSeen(true),
            setTimestamp(System.currentTimeMillis())
        };
    end
    

    【讨论】:

    • 我正在检查 Drools,看起来很酷,但现在想使用 SQL 来实现它。
    【解决方案2】:

    您可以使用带有Stride HTTP API 的SQL 来执行此操作。您可以将连续 SQL 查询的网络链接在一起并订阅更改流,如果您想在发生这种情况时采取某种任意操作,还可以触发实时 webhook。有关更多信息,请参阅Stride API docs

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-14
      • 1970-01-01
      • 2017-08-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-18
      相关资源
      最近更新 更多