【问题标题】:Apache Flink: How to group every n rows with the Table API?Apache Flink:如何使用 Table API 对每 n 行进行分组?
【发布时间】:2018-07-17 13:00:54
【问题描述】:

最近我正在尝试使用 Apache Flink 进行快速批处理。 我有一个带有 column:value 和不相关索引列的表

基本上我想计算每 5 行值的平均值和范围。然后我将根据我刚刚计算的平均值计算平均值和标准差。所以我想最好的方法是使用Tumble窗口。

看起来像这样

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev} 

但我不知道在.on() 中写什么。我试过"proctime",但它说没有这样的输入。我只希望它按从源读取的顺序分组。但它必须是时间属性,所以我不能使用"f2" - 索引列也是排序。

我是否必须添加时间戳才能执行此操作?批处理中是否有必要,它会减慢计算速度吗?解决这个问题的最佳方法是什么?

更新: 我尝试在表格 API 中使用滑动窗口,但它得到了异常。

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

例外是:

线程“main”java.lang.UnsupportedOperationException 中的异常:当前不支持在事件时间计算滑动组窗口。

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456)

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139)

...

这是否意味着 Table API 不支持滑动窗口?如果我没记错的话,DataSet API 中没有窗口函数。那么如何在批处理中计算移动范围呢?

【问题讨论】:

    标签: apache-flink flink-sql


    【解决方案1】:

    window 子句用于定义基于窗口函数的分组,例如TumbleSession。除非您指定行的顺序,否则在 Table API(或 SQL)中没有很好地定义每 5 行分组。这是在Tumble 函数的on 子句中完成的。由于此功能源自流处理,on 子句需要时间戳属性。

    您可以使用currentTimestamp() 函数获取当前时间的时间戳。但是,我应该指出,Flink 会对数据进行排序,因为它不知道函数的单调性。此外,所有这些都将具有 1 的并行度,因为没有允许分区的子句。

    或者,您还可以实现一个用户定义的标量函数,将索引属性转换为时间戳(实际上是一个 Long 值)。但同样,Flink 会对数据进行完整的排序。

    【讨论】:

    • 感谢您的回答!那么在这种情况下,您是否建议我应该添加一个索引列并将其转换为长列?现在我在 DataSet 中使用 zipWithIndex() 来应用索引并使用自定义 flatMap 函数为每一行分配组号
    • 我来自 DataSet API,您可能想看看MapPartitionFunction,它也可用于此计算并避免完整排序。在这种情况下,您将不需要 Table API。
    • 嗨,我尝试将我的 groupNumber 从 INTEGER 转换为 LONG 并尝试将其用作窗口中的时间属性。但是我得到了这个异常:线程“main”中的异常java.lang.UnsupportedOperationException:当前不支持在事件时间计算滑动组窗口。如果我写错了什么,你能建议一下吗?我将在问题的末尾发布我的代码。谢谢
    • 哦,是的,批处理 Table API / SQL 查询似乎尚不支持事件时间计数窗口。我建议使用MapPartitionFunction 来实现这一点,如果数据已经排序,这应该会更有效率。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-01
    • 2014-08-01
    • 2018-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多