【问题标题】:Flink: DataStream left join Table. Super simpleFlink:DataStream 左连接表。超级简单
【发布时间】:2025-12-04 00:20:07
【问题描述】:
        DataStream<String> sourceStream = streamEnv.fromElements("key_a", "key_b", "key_c", "key_d");

        Table lookupTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("my_key", DataTypes.STRING()),
                        DataTypes.FIELD("my_value", DataTypes.STRING())
                ),
                Expressions.row("key_a", "value_a"),
                Expressions.row("key_b", "value_b")
        );

我想将流加入到表格中。

这显然是一个简化的演示场景。在使用更大的生产数据集之前,我想了解如何使用 Flink API 通过玩具数据集实现这一目标。

Table joins 上的文档展示了如何连接两个表并取回另一个表,这不是我想要的:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins

DataStream joins 上的文档显示在一个时间窗口上连接两个流,这也不是我想要的:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我相信这就是您正在寻找的。此示例将 sourceStream 转换为动态表,将其与查找表连接,然后将生成的动态表转换回流以进行打印。

    您可以改为使用 DataStream API 对 resultStream 进行进一步处理。

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Expressions;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class JoinExample {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            DataStream<String> sourceStream = env.fromElements("key_a", "key_b", "key_c", "key_d");
            Table streamTable = tableEnv.fromDataStream(sourceStream, $("stream_key"));
    
            Table lookupTable = tableEnv.fromValues(
                    DataTypes.ROW(
                            DataTypes.FIELD("lookup_key", DataTypes.STRING()),
                            DataTypes.FIELD("lookup_value", DataTypes.STRING())
                    ),
                    Expressions.row("key_a", "value_a"),
                    Expressions.row("key_b", "value_b")
            );
    
            Table resultTable = streamTable
                    .join(lookupTable).where($("stream_key").isEqual($("lookup_key")))
                    .select($("stream_key"), $("lookup_value"));
    
            DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
    
            resultStream.print();
    
            env.execute();
        }
    }
    

    输出是

    key_b,value_b
    key_a,value_a
    

    【讨论】: