【发布时间】:2019-12-21 13:26:42
【问题描述】:
我正在使用 Apache Beam 从 PubSub 读取消息并将它们写入 BigQuery。我要做的是根据输入中的信息写入多个表。为了减少写入量,我对来自 PubSub 的输入使用窗口化。
一个小例子:
messages
.apply(new PubsubMessageToTableRow(options))
.get(TRANSFORM_OUT)
.apply(ParDo.of(new CreateKVFromRow())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
// group by key
.apply(GroupByKey.create())
// Are these two rows what I want?
.apply(Values.create())
.apply(Flatten.iterables())
.apply(BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> {
// Simplified for readability
Integer destination = (Integer) input.getValue().get("key");
return new TableDestination(
new TableReference()
.setProjectId(options.getProjectID())
.setDatasetId(options.getDatasetID())
.setTableId(destination + "_Table"),
"Table Destination");
}));
我在文档中找不到任何内容,但我想知道每个窗口执行了多少次写入?如果这些是多个表,是否为窗口中所有元素的每个表写入一个?还是每个元素一次,因为每个表可能因每个元素而异?
【问题讨论】:
标签: java google-bigquery google-cloud-dataflow apache-beam