【问题标题】:How to join BQ tables on two or more keys with Cloud Dataflow?如何使用 Cloud Dataflow 在两个或多个键上加入 BQ 表?
【发布时间】:2018-10-15 00:04:20
【问题描述】:
我有两个表 A 和 B。它们都有字段 session_id 和 cookie_id。
如何借助 Dataflow 管道在 session_id、cookie_id 上创建连接 A 和 B 的连接表输出? CoGroupByKey 方法允许您加入单个键。在文档中也找不到任何有用的东西。
【问题讨论】:
标签:
java
google-cloud-platform
google-bigquery
google-cloud-dataflow
【解决方案1】:
扩展 user9720010 的答案。您可以通过将字段映射到session_id 和cookie_id 的组合来创建复合键。此模式在常见 Dataflow 用例模式 blog 中进行了说明。假设您使用的是 BigQuery,您可以执行类似以下操作:
Pipeline pipeline = Pipeline.create(options);
// Create tuple tags for the value types in each collection.
final TupleTag<TableRow> table1Tag = new TupleTag<>();
final TupleTag<TableRow> table2Tag = new TupleTag<>();
// Transform for keying table rows by session_id and cookie_id
WithKeys<String, TableRow> sessionAndCookieKeys = WithKeys.of(
(TableRow row) ->
String.format("%s#%s",
row.get("session_id"),
row.get("cookie_id")))
.withKeyType(TypeDescriptors.strings());
/*
* Steps:
* 1) Read table 1's rows
* 2) Read table 2's rows
* 3) Map each row to a composite key
* 4) Join on the composite key
* 5) Process the results
*/
PCollection<KV<String, TableRow>> table1Rows = pipeline
.apply(
"ReadTable1",
BigQueryIO
.readTableRows()
.from(options.getTable1()))
.apply("WithKeys", sessionAndCookieKeys);
PCollection<KV<String, TableRow>> table2Rows = pipeline
.apply(
"ReadTable2",
BigQueryIO
.readTableRows()
.from(options.getTable2()))
.apply("WithKeys", sessionAndCookieKeys);
//Merge collection values into a CoGbkResult collection
PCollection<KV<String, CoGbkResult>> coGbkResult = KeyedPCollectionTuple
.of(table1Tag, table1Rows)
.and(table2Tag, table2Rows)
.apply("JoinOnSessionAndCookie", CoGroupByKey.create());
// Process the results
coGbkResult.apply(
"ProcessResults",
ParDo.of(new DoFn<KV<String, CoGbkResult>, Object>() {
@ProcessElement
public void processElement(ProcessContext context) {
// Do something here
}
}));
【解决方案2】:
在这种情况下,我采用的一种方法是创建一个临时密钥,它是两个密钥的组合。
读取数据后,在转换为键值对时,我会将 session_id$cookie_id 输出为单个连接字符串。这里 $ 可以是任何不构成两个键字符集的分隔符。分隔符也可以忽略。