【发布时间】:2019-03-05 06:44:26
【问题描述】:
Dataflow - BigQuery 是否有窗口写入?我正在尝试运行读取 5 亿行文件,然后写入 BigQuery 的 Dataflow 作业。 当我运行时,它没有超过 1500 万,因此查看是否有任何类型的 Windowing 写入 BigQuery 会有所帮助。在运行时,我遇到了许多 GC 分配失败,但我看到这些都是正常的。我在运行时保留了默认的 diskSize 配置。请帮忙。如果有任何窗口写入 BigQuery 的示例,请提供。
至于转换,只是对字符串进行拆分,然后插入到 BigQuery 中。
另外,下面的示例是否在不断从 PubSub 流式传输时不断写入 BigQuery? https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java
下面是我的示例
Pipeline pipeline = Pipeline.create(options);
PCollection<String> textData = pipeline.apply("Read Text Data",
TextIO.read().from(options.getInputFilePattern()));
PCollection<TableRow> tr = textData.apply(ParDo.of(new FormatRemindersFn()));
tr.apply(BigQueryIO.writeTableRows().withoutValidation() .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.withSchema(FormatRemindersFn.getSchema())
// .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.to(options.getSchemaDetails()));
static class FormatRemindersFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
try {
if (StringUtils.isNotEmpty(c.element())) {
String[] fields = c.element().split("\\^",15);
// logger.info("Fields :{}", fields[2]);
TableRow row = new TableRow().set("MODIFIED_DATE", fields[0])
.set("NAME", fields[1])
.set("ADDRESS", fields[2]);
c.output(row);
}
} catch (Exception e) {
logger.error("Error: {}", e.getMessage());
}
}
}
【问题讨论】:
-
500M 对于数据流来说应该很容易。你能确认你正在阅读 GCS 吗?多少个文件?您是否在 15M 时遇到错误?
-
在注释了作为每个元素的 DoFn 的一部分完成的日志记录后,错误得到了解决。
-
太棒了。您能否为后代提供自己的答案。
标签: google-bigquery google-cloud-dataflow dataflow