【问题标题】:Windowed grouping using message keys in PyFlink在 PyFlink 中使用消息键进行窗口分组
【发布时间】:2025-12-20 14:00:11
【问题描述】:

我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:

  • 从消息包含 UserId 的 Kafka 主题中读取数据
  • 对数据执行超过 2 秒的翻转窗口
  • 使用我的 windows 值调用 Python UDF

这是我试图实现的数据流的可视化表示:

我正在使用 PyFlink 的 Table API,并且我的两个表都是使用 SQL DDL 声明的。

我的查询执行如下所示:

SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data

这是我的 Python UDF 函数:

@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
    
    # ...business logic here with window_data
    
    return some_result

我目前的问题是,由于某种原因,my_udf 函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。

我一直在研究 PyFlink 文档,但无法找到如何实现我想要的。

该信息可能在文档中,但我似乎无法找到/理解它。

任何帮助将不胜感激。

谢谢!

【问题讨论】:

    标签: python apache-flink pyflink


    【解决方案1】:

    如果我正确理解您要执行的操作,您希望修改您的查询,使其Data 列或Timestamp 分组

    SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
    FROM InputTable 
    GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId
    

    然后您想要实现一个user-defined aggregate function,它将来自给定用户的窗口中所有行的数据列的值聚合为单个值。我在上面链接到的文档中有一个示例。

    【讨论】:

    • 这正是我需要做的。非常感谢!