【问题标题】:Left join operation in apache beamApache Beam 中的左连接操作
【发布时间】:2018-03-03 01:27:41
【问题描述】:

我有两个具有公共键列的数据集,我想执行左连接操作。 apache beam中是否有对应的函数在apache beam中执行left join操作?

【问题讨论】:

    标签: apache-beam


    【解决方案1】:

    Beam Java SDK 中有一个小的连接库,看看这个实现是否适合你:org.apache.beam.sdk.extensions.joinlibrary.Joinsource

    更新

    你可以用类似的方法自己实现它,利用CoGroupByKey:
    - 将PCollections 都放入KeyedPCollectionTuple
    - 应用CoGroupByKey,它将对每个窗口每个键的PCollections 中的元素进行分组;
    - 应用 ParDo 循环遍历 CoGroupByKey 的结果,一次加入左右记录,并发出结果(参见 CoGroupByKey example in the Beam Programming Guide);

    【讨论】:

    • 这并没有提供问题的答案。一旦你有足够的reputation,你就可以comment on any post;相反,provide answers that don't require clarification from the asker。 - From Review
    • 我不确定我是否理解您的观点。发帖人询问 Beam 是否有左连接的实现。我提供了一个指向 Beam SDK 中函数的链接,它的字面意思是 leftOuterJoin()。这怎么不回答这个问题?连接的这种特定实现可能无法满足作者的特定要求(问题中没有很多细节),因此评论看看它是否适合他。我没有要求澄清。
    • 以上是当您的答案被标记为“不提供答案”或“应该是评论”时发布的默认消息。此外,在您的情况下,答案看起来像“链接答案”,它链接到可能随时更改的外部资源。您需要在答案中包含重要的细节,这样如果链接被破坏,答案可能仍然提供足够的细节来回答问题。如果您点击我发布的自动消息中的最后一个链接并寻找“澄清”,它会说:通常,真正重要的信息应该包含在答案中
    • 我明白了。谢谢你的指导。稍微更新了响应。
    【解决方案2】:

    左外连接示例:

    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.extensions.joinlibrary.Join;
    import org.apache.beam.sdk.testing.PAssert;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.junit.Test;
    
    public class TestJoin {
    
        @Test
        public void left_join_example() {
            Pipeline pipeline = Pipeline.create();
            PCollection<KV<Integer, String>> leftCollection = pipeline.apply(Create.of(KV.of(1, "a"), KV.of(2, "b"), KV.of(3, "c")));
            PCollection<KV<Integer, Integer>> rightCollection = pipeline.apply(Create.of(KV.of(1, 1), KV.of(1, 10), KV.of(2, 2), KV.of(4, 4)));
            PCollection<KV<Integer, KV<String, Integer>>> leftJoinCollection = Join.leftOuterJoin(leftCollection, rightCollection, 0);
    
            PAssert.that(leftJoinCollection).containsInAnyOrder(KV.of(1, KV.of("a" , 1)), KV.of(1, KV.of("a" , 10)),
                    KV.of(2, KV.of("b", 2)), KV.of(3, KV.of("c", 0)));
    
            pipeline.run().waitUntilFinish();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-04-13
      • 1970-01-01
      • 1970-01-01
      • 2019-09-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多