【问题标题】:Analyze a tumbling window with a lag in AWS Kinesis Analytics SQL在 AWS Kinesis Analytics SQL 中分析具有滞后的滚动窗口
【发布时间】:2019-08-28 09:31:58
【问题描述】:

我有一个似乎应该由 Kinesis Analytics SQL 支持的用例,但我似乎无法弄清楚。

这是我的场景:

  • 我有一个输入数据流,其中每个事件都有一个 event_time 字段和 device_id 字段。
  • 我想按 event_time 和 device_id 聚合数据。这里 event_time 作为源数据中的一个字段提供,它不是将该行添加到 Kinesis Analytics 应用程序的 ROWTIME,也不是大致到达时间。
  • 向我的流发送数据的进程有一些延迟,因此可能会在 event_time 发生后 3 分钟内将行添加到我的流中。

我的目标是获得一份按 event_time 和 device_id 汇总的报告,其中每个 event_time 有一行,并在该行中包含该 event_time 的所有数据。

所以,我的数据流可能如下所示:

rowtime, event_time, device_id, num_things
12:29:04, 12:27:00, server1, 19
12:30:22, 12:28:00, server1, 33
12:30:23, 12:27:00, server2, 8
12:30:25, 12:29:00, server1, 11
12:31:33, 12:28:00, server2, 2
12:31:44, 12:29:00, server3, 83
12:32:56, 12:29:00, server2, 6

这里的关键点是,event_times 的数据(例如 12:27)会在几分钟内出现,并且可能比将这些数据添加到 Kinesis Analytics 流时提前最多 3 分钟。

我希望我的输出是:

event_time, total_num_things
12:27, 27  <- sums up 19 + 8 for event_time 12:27
12:28, 35 <- sums up 33+2 for event_time 12:28
12:29, 100 <- sums up 11+83+6 for event_time 12:29

这可能吗?

我能找到的所有示例在输出中都会有一个 ROWTIME 翻滚窗口,因此 event_time 的聚合可能会在多个 ROWTIME 分钟桶中分解。

【问题讨论】:

    标签: amazon-kinesis


    【解决方案1】:

    【讨论】:

    • 谢谢好心的陌生人!我浏览了文档,这看起来很有希望,不幸的是我已经转向了另一种技术,所以我无法验证。如果这积累了其他赞成票,我会接受。
    • 你采用了什么技术?只是好奇。
    • 我的应用程序逻辑允许我自己在 AWS Lambda 中进行聚合。
    【解决方案2】:

    对于那些没有转向新技术的人 ;-)。滑动窗口在这里不太合适,因为我们没有对时间间隔内的事件设置约束,而是我们希望总是按时间分组然后求和。只是事件无法立即使用。

    所以语义更接近工作会话,其中 sessionId 是时间点。

    这可以用 Drools 表示:

    类型:

    package com.test;
    
    import java.util.List;
    
    declare EventA
        @role(event)
    
        eventTime: long;
        deviceId: int;
        numThings: int;
        seen: boolean;
    end
    
    declare Group
        eventTime: long @key;
        events: List;
    end
    
    declare Summary
        eventTime: long;
        sumNumThings: int;
    end
    

    规则:

    package com.test;
    
    import java.util.List;
    import java.util.ArrayList;
    import java.util.stream.Collectors;
    
    rule "GroupCreate"
    when
        // for every new EventA
        EventA(seen == false, $time: eventTime) from entry-point events
        // check there is no group
        not (exists(Group(eventTime == $time)))
    then
        insert(new Group($time, new ArrayList()));
    end
    
    rule "GroupJoin"
    when
        // for every new EventA
        $a : EventA(seen == false) from entry-point events
        // get event's group
        $g: Group(eventTime == $a.eventTime)
    then
        $g.getEvents().add($a);
        modify($a) {setSeen(true);}
    end
    
    rule "Summarize"
    // if session timed out, clean up first
    salience 5
    when
        // for every EventA
        $a : EventA() from entry-point events
        // check there is no more events within 30 seconds
        not (exists(EventA(this != $a, eventTime == $a.eventTime,
            this after[0, 30s] $a) from entry-point events))
        // get event's group
        $g: Group(eventTime == $a.eventTime)
    then
        int sum = (int)$g.getEvents().stream().collect(
            Collectors.summingInt(EventA::getNumThings));
        insertLogical(new Summary($g.getEventTime(), sum));
    
        // cleanup
        for (Object $x : $g.getEvents())
            delete($x);
        delete($g);
    end
    

    您可以使用this service 编写 Drools Kinesis Analytics

    【讨论】:

      【解决方案3】:

      似乎“交错窗口”是您正在寻找的。​​p>

      https://docs.aws.amazon.com/kinesisanalytics/latest/dev/stagger-window-concepts.html

      使用交错窗口是一种窗口方法,适用于分析在不一致时间到达的数据组。它非常适合任何时间序列分析用例,例如一组相关的销售或日志记录。

      例如,VPC 流日志的捕获窗口约为 10 分钟。但如果您在客户端上聚合数据,它们的捕获窗口最长可达 15 分钟。交错窗口非常适合聚合这些日志以进行分析。

      错开窗口解决了相关记录不落入同一时间限制窗口的问题,例如使用翻滚窗口时。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-03-03
        • 1970-01-01
        • 2013-10-27
        • 2019-06-17
        • 1970-01-01
        • 2017-04-30
        • 2023-03-26
        相关资源
        最近更新 更多