【问题标题】:Flink: no outer joins on DataStream?Flink:DataStream 上没有外连接?
【发布时间】:2016-11-12 04:07:30
【问题描述】:

我惊讶地发现在 Flink (DataStream docs) 中 DataStream 没有外连接。

对于DataSet,除了常规的join (DataSet docs) 之外,您还有所有选项:leftOuterJoinrightOuterJoinfullOuterJoin。但是对于DataStream,您只需使用普通的旧连接即可。

这是由于DataStream 的一些基本属性导致无法进行外部连接吗?或者也许我们可以在(接近?)未来期待这一点?

我真的可以在DataStream 上使用外连接来解决我正在处理的问题...有没有办法实现类似的行为?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    您可以使用DataStream.coGroup() 转换实现外连接。 CoGroupFunction 接收两个迭代器(每个输入一个),它们为某个键的所有元素提供服务,如果没有找到匹配的元素,它们可能为空。这允许实现外连接功能。

    在 Flink 的下一个版本中,可能会将对外部连接的一流支持添加到 DataStream API。我目前不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。

    【讨论】:

    • 好的,我已经创建了这个问题,如果有帮助的话:jira。我会试试coGroup :)
    • 嗨@Fabian,可以使用connect 来实现吗?
    【解决方案2】:

    一种方法是从流 -> 表 -> 流,使用以下 api:FLINK TABLE API - OUTER JOIN

    这是一个java示例:

        DataStream<String> data = env.readTextFile( ... );
        DataStream<String> data2Merge = env.readTextFile( ... );
    
        ...
    
        tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
        tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
    
        String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
        String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
    
        Table tableLeft = tableEnv.sqlQuery(queryLeft);
        Table tableRight = tableEnv.sqlQuery(queryRight);
    
        Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
    

    【讨论】:

    • 你如何处理retractStream?它将包含具有真/假的多行
    猜你喜欢
    • 1970-01-01
    • 2015-03-10
    • 1970-01-01
    • 2013-11-04
    • 1970-01-01
    • 1970-01-01
    • 2011-09-15
    • 2021-03-08
    • 2016-05-03
    相关资源
    最近更新 更多