【发布时间】:2018-07-20 11:52:17
【问题描述】:
我在从 DoFn 中的 bigquery 表中检索数据时遇到问题。我找不到从 TypedRead 中提取值的示例。
这是一个简化的管道。我想检查 bigquery 表中是否存在目标 SSN 的记录。目标 SSN 将在实际管道中通过 pubsub 接收,我已将其替换为字符串数组。
final BigQueryIoTestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryIoTestOptions.class);
final List<String> SSNs = Arrays.asList("775-89-3939");
Pipeline p = Pipeline.create(options);
PCollection<String> ssnCollection = p.apply("GetSSNParams", Create.of(SSNs)).setCoder(StringUtf8Coder.of());
ssnCollection.apply("SelectFromBQ", ParDo.of(new DoFn<String, TypedRead<TableRow>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TypedRead<TableRow> tr =
BigQueryIO.readTableRows()
.fromQuery("SELECT pid19PatientSSN FROM dataset.table where pid19PatientSSN = '" + c.element() + "' LIMIT 1");
c.output(tr);
}
}))
.apply("ParseResponseFromBigQuery", ParDo.of(new DoFn<TypedRead<TableRow>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element().toString());
}
}));
p.run();
【问题讨论】:
-
您是否收到错误或根本没有输出?它是否在本地运行(即
DirectRunner),因为您正在执行println?
标签: google-bigquery google-cloud-dataflow apache-beam