【问题标题】:How to join BQ tables on two or more keys with Cloud Dataflow?如何使用 Cloud Dataflow 在两个或多个键上加入 BQ 表?
【发布时间】:2018-10-15 00:04:20
【问题描述】:

我有两个表 A 和 B。它们都有字段 session_idcookie_id。 如何借助 Dataflow 管道在 session_idcookie_id 上创建连接 A 和 B 的连接表输出? CoGroupByKey 方法允许您加入单个键。在文档中也找不到任何有用的东西。

【问题讨论】:

    标签: java google-cloud-platform google-bigquery google-cloud-dataflow


    【解决方案1】:

    扩展 user9720010 的答案。您可以通过将字段映射到session_idcookie_id 的组合来创建复合键。此模式在常见 Dataflow 用例模式 blog 中进行了说明。假设您使用的是 BigQuery,您可以执行类似以下操作:

    Pipeline pipeline = Pipeline.create(options);
    
    // Create tuple tags for the value types in each collection.
    final TupleTag<TableRow> table1Tag = new TupleTag<>();
    final TupleTag<TableRow> table2Tag = new TupleTag<>();
    
    // Transform for keying table rows by session_id and cookie_id
    WithKeys<String, TableRow> sessionAndCookieKeys = WithKeys.of(
        (TableRow row) ->
            String.format("%s#%s",
                row.get("session_id"),
                row.get("cookie_id")))
        .withKeyType(TypeDescriptors.strings());
    
    /*
     * Steps:
     *  1) Read table 1's rows
     *  2) Read table 2's rows
     *  3) Map each row to a composite key
     *  4) Join on the composite key
     *  5) Process the results
     */
    PCollection<KV<String, TableRow>> table1Rows = pipeline
        .apply(
            "ReadTable1",
            BigQueryIO
                .readTableRows()
                .from(options.getTable1()))
        .apply("WithKeys", sessionAndCookieKeys);
    
    PCollection<KV<String, TableRow>> table2Rows = pipeline
        .apply(
            "ReadTable2",
            BigQueryIO
                .readTableRows()
                .from(options.getTable2()))
        .apply("WithKeys", sessionAndCookieKeys);
    
    //Merge collection values into a CoGbkResult collection
    PCollection<KV<String, CoGbkResult>> coGbkResult = KeyedPCollectionTuple
        .of(table1Tag, table1Rows)
        .and(table2Tag, table2Rows)
        .apply("JoinOnSessionAndCookie", CoGroupByKey.create());
    
    // Process the results
    coGbkResult.apply(
        "ProcessResults", 
        ParDo.of(new DoFn<KV<String, CoGbkResult>, Object>() {
          @ProcessElement
          public void processElement(ProcessContext context) {
            // Do something here
          }
        }));
    

    【讨论】:

      【解决方案2】:

      在这种情况下,我采用的一种方法是创建一个临时密钥,它是两个密钥的组合。 读取数据后,在转换为键值对时,我会将 session_id$cookie_id 输出为单个连接字符串。这里 $ 可以是任何不构成两个键字符集的分隔符。分隔符也可以忽略。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-04
        • 1970-01-01
        • 1970-01-01
        • 2011-07-26
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多