【问题标题】:BigQueryIO returns TypedRead<TableRow> instead of PCollection<TableRow>. How to get the real data?BigQueryIO 返回 TypedRead<TableRow> 而不是 PCollection<TableRow>。如何获取真实数据?
【发布时间】:2018-07-20 11:52:17
【问题描述】:

我在从 DoFn 中的 bigquery 表中检索数据时遇到问题。我找不到从 TypedRead 中提取值的示例。

这是一个简化的管道。我想检查 bigquery 表中是否存在目标 SSN 的记录。目标 SSN 将在实际管道中通过 pubsub 接收,我已将其替换为字符串数组。

final BigQueryIoTestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryIoTestOptions.class);

final List<String> SSNs = Arrays.asList("775-89-3939");

Pipeline p = Pipeline.create(options);

PCollection<String> ssnCollection = p.apply("GetSSNParams", Create.of(SSNs)).setCoder(StringUtf8Coder.of());

ssnCollection.apply("SelectFromBQ", ParDo.of(new DoFn<String, TypedRead<TableRow>>() {
  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    TypedRead<TableRow> tr =
    BigQueryIO.readTableRows()
    .fromQuery("SELECT pid19PatientSSN FROM dataset.table where pid19PatientSSN = '" + c.element() + "' LIMIT 1");

    c.output(tr);
  }
  }))
.apply("ParseResponseFromBigQuery", ParDo.of(new DoFn<TypedRead<TableRow>, Void>() {
  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    System.out.println(c.element().toString());
  }
}));

p.run();

【问题讨论】:

  • 您是否收到错误或根本没有输出?它是否在本地运行(即DirectRunner),因为您正在执行println

标签: google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

大查询只返回 PCollection,我们可以像下面的例子一样将结果作为条目集获得,或者我们可以像提到的 here 那样序列化为对象

如果您想从管道的 BigQuery 中间查询,请使用 BigQuery 而不是 BigQueryIO,如提到的 here

BigQueryIO 示例:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
                .fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
            @Override
            public Void apply(TableRow obj) {
                System.out.println("***" + obj);
                obj.entrySet().forEach(
                        (k)-> {
                            System.out.println(k.getKey() + " :" + k.getValue());
                        }
                );
                return null;
            }
        }));
        pipeline.run().waitUntilFinish();

BigQuery 示例:

// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
      "SELECT "
          + "CONCAT('https://stackoverflow.com/questions/', CAST(id as STRING)) as url, "
          + "view_count "
          + "FROM `bigquery-public-data.stackoverflow.posts_questions` "
          + "WHERE tags like '%google-bigquery%' "
          + "ORDER BY favorite_count DESC LIMIT 10")
        // Use standard SQL syntax for queries.
        // See: https://cloud.google.com/bigquery/sql-reference/
        .setUseLegacySql(false)
        .build();

// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

// Wait for the query to complete.
queryJob = queryJob.waitFor();

// Check for errors
if (queryJob == null) {
  throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
  // You can also look at queryJob.getStatus().getExecutionErrors() for all
  // errors, not just the latest one.
  throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// [END bigquery_simple_app_query]

// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);

TableResult result = queryJob.getQueryResults();

// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
  String url = row.get("url").getStringValue();
  long viewCount = row.get("view_count").getLongValue();
  System.out.printf("url: %s views: %d%n", url, viewCount);
}
// [END bigquery_simple_app_print]

【讨论】:

  • 您的示例不正确,因为问题与 BigQueryIO 放置在管道中间的情况有关。您的案例运行良好,谷歌在他的示例中使用了它。
猜你喜欢
  • 1970-01-01
  • 2016-04-03
  • 1970-01-01
  • 1970-01-01
  • 2022-12-31
  • 2021-12-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多