【问题标题】:Event-time Temporal Join in Apache Flink only works with small datasetsApache Flink 中的 Event-time Temporal Join 仅适用于小型数据集
【发布时间】:2026-01-17 13:40:01
【问题描述】:

背景:我正在尝试使用从 CSV 文件中读取的两个“大型(r)”数据集/表(左表中的 16K+ 行,右表中的行数较少)进行事件时间时态连接.这两个表都是仅附加表,即它们的数据源当前是 CSV 文件,但将成为 Debezium 在 Pulsar 上发出的 CDC 更改日志。 我正在使用相当新的SYSTEM_TIME AS OF 语法。

问题:连接结果只是部分正确,即在查询执行的开始(前 20% 左右),左侧的行与来自右边,虽然理论上,他们应该。几秒钟后,有更多匹配项,到查询结束时,左侧的行与右侧的行正确匹配/连接。 每次我运行查询时,它都会根据(不)匹配的行显示其他结果。

两个数据集都没有按各自的事件时间排序。它们按主键排序。所以真的是this case,只是有更多的数据。

本质上,右侧是一个随时间变化的查找表,我们确信对于每个左侧记录都有一个匹配的右侧记录,因为两者都是在原始数据库中以 +/- 相同的位置创建的立即的。最终,我们的目标是动态物化视图,其中包含与我们在启用 CDC 的源数据库 (SQL Server) 中连接 2 个表时相同的数据。

显然,我想在 complete 数据集上实现 正确 连接,如 in the Flink docs
所述 与只有几行的小数据集(如here)的简单示例和 Flink 测试代码不同,较大数据集的连接不会产生正确的结果。

我怀疑,当探测/左表开始流动时,构建/右表尚未“在内存中”,这意味着左行没有找到匹配的右行,而它们应该 - 如果右表会更早开始流动。这就是left join 为右表的列返回空值的原因。

我已包含我的代码:

@Slf4j(topic = "TO_FILE")
public class CsvTemporalJoinTest {

    private final String emr01Ddl =
            "CREATE TABLE EMR01\n" +
                    "(\n" +
                    "    SRC_NO         STRING,\n" +
                    "    JRD_ETT_NO     STRING,\n" +
                    "    STT_DT         DATE,\n" +
                    "    MGT_SLT_DT     DATE,\n" +
                    "    ATM_CRT_DT     DATE,\n" +
                    "    LTD_MDT_IC     STRING,\n" +
                    "    CPN_ORG_NO     STRING,\n" +
                    "    PTY_NO         STRING,\n" +
                    "    REG_USER_CD    STRING,\n" +
                    "    REG_TS         TIMESTAMP,\n" +
                    "    MUT_USER_CD    STRING,\n" +
                    "    MUT_TS         TIMESTAMP(3),\n" +
                    "    WATERMARK FOR MUT_TS AS MUT_TS,\n" +
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    "   'connector' = 'filesystem',\n" +
                    "   'path' = '" + getCsv1() + "',\n" +
                    "   'format' = 'csv'\n" +
                    ")";

    private final String emr02Ddl =
            "CREATE TABLE EMR02\n" +
                    "(\n" +
                    "    CPN_ORG_NO  STRING,\n" +
                    "    DSB_TX      STRING,\n" +
                    "    REG_USER_CD STRING,\n" +
                    "    REG_TS      TIMESTAMP,\n" +
                    "    MUT_USER_CD STRING,\n" +
                    "    MUT_TS      TIMESTAMP(3),\n" +
                    "    WATERMARK FOR MUT_TS AS MUT_TS,\n" +
                    "    PRIMARY KEY (CPN_ORG_NO) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    "   'connector' = 'filesystem',\n" +
                    "   'path' = '" + getCsv2() + "',\n" +
                    "   'format' = 'csv'\n" +
                    ")";

    @Test
    public void testEventTimeTemporalJoin() throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();
        var tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(emr01Ddl);
        tableEnv.executeSql(emr02Ddl);

        Table result = tableEnv.sqlQuery("" +
                "SELECT *" +
                "   FROM EMR01" +
                "   LEFT JOIN EMR02 FOR SYSTEM_TIME AS OF EMR01.MUT_TS" +
                "       ON EMR01.CPN_ORG_NO = EMR02.CPN_ORG_NO");

        tableEnv.toChangelogStream(result).addSink(new TestSink());
        env.execute();

        System.out.println("[Count]" + TestSink.values.size());
        //System.out.println("[Row 1]" + TestSink.values.get(0));
        //System.out.println("[Row 2]" + TestSink.values.get(1));
        AtomicInteger i = new AtomicInteger();
        TestSink.values.listIterator().forEachRemaining(value -> log.info("[Row " + i.incrementAndGet() + " ]=" + value));
    }

    private static class TestSink implements SinkFunction<Row> {

        // must be static
        public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Row value, SinkFunction.Context context) {
            values.add(value);
        }
    }

    String getCsv1() {
        try {
            return new ClassPathResource("/GBTEMR01.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    String getCsv2() {
        try {
            return new ClassPathResource("/GBTEMR02.csv").getFile().getAbsolutePath();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

有没有办法解决这个问题?例如。有没有办法先将右侧加载到 Flink 状态,然后开始加载/流式传输左侧?这会是一个好方法吗,因为这个问题很重要:多久之后?左边什么时候可以开始流动?

我们正在使用 Flink 1.13.3。

【问题讨论】:

    标签: join apache-flink temporal


    【解决方案1】:

    这种时间/版本连接依赖于准确的水印。 Flink 依靠水印来知道哪些行可以安全地从正在维护的状态中删除(因为它们不再影响结果)。

    您使用的水印表明行按MUT_TS 排序。由于这不正确,因此连接无法产生完整的结果。

    要解决此问题,应使用类似这样的内容定义水印

    WATERMARK FOR MUT_TS AS MUT_TS - INTERVAL '2' MINUTE
    

    间隔表示需要适应多少无序。

    【讨论】:

    • 谢谢大卫,显然这解决了它 - 你真的为我节省了一天。但我认为我只是“幸运”,2 分钟似乎就足够了。实际上(逻辑上)这两个流都来自 Debezium,并按 PK(一个序列)排序。因此,具有较低 PK 的记录也具有较低的 MUT_TS(= 初始创建时的 REG_TS)。当然,这也不能说是巧合,但如果 PK不是一个序列,这根本就不会起作用,对吧?所以总的来说,两个流都按 MUT_TS 排序不是更好吗?但这提出了一个问题:Debezium 可以实现后者吗?
    • Debezium 按照它们发生的顺序产生事件;我觉得应该没问题。