【问题标题】:Cloud Dataflow, PubSub & Bigquery IssuesCloud Dataflow、PubSub 和 Bigquery 问题
【发布时间】:2018-07-15 13:44:08
【问题描述】:

我想使用 Cloud Dataflow、PubSub 和 Bigquery 将 tableRow 写入 Pubsub 消息,然后将它们写入 Bigquery。 我希望表名、项目 ID 和数据集 ID 是动态的。
我在互联网上看到以下代码,我无法理解如何传递数据行参数。

public void PubSub(String projectId , String datasetId,String tableId,String topicId)       
    PipelineOptions options = PipelineOptionsFactory.create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setStreaming(true);
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    PCollection<TableRow> input = pipeline.apply(PubsubIO.Read.topic(createTopic(projectId,topicId).getName()).withCoder(TableRowJsonCoder.of()))
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

    input.apply(BigQueryIO.Write.to(getTableReference(projectId,datasetId, tableId)).withSchema(getSchema()));

    pipeline.run();
}


private static TableReference getTableReference(String projectId , String datasetId,String tableId) {
      TableReference tableRef = new TableReference();
      tableRef.setProjectId(projectId);
      tableRef.setDatasetId(datasetId);
      tableRef.setTableId(tableId);
      return tableRef;
}

提前致谢, 加尔

【问题讨论】:

    标签: java google-bigquery google-cloud-dataflow publish-subscribe google-cloud-pubsub


    【解决方案1】:

    BigQueryIO.Write 转换不支持动态输出。但是您可以直接从 DoFn 进行 BigQuery API 调用。

    这样,您可以将表名称设置为您想要的任何名称,由您的代码计算得出。这可以从侧面输入中查找,或者直接从 DoFn 当前正在处理的元素中计算出来。

    为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle(); 批量处理请求

    我不完全明白您是否要将 Dataflow 写入 Pub/Sub,然后将 Pub/Sub 写入 BigQuery?您可以直接写入 BigQuery 而不使用 Pub/Sub。

    【讨论】:

      猜你喜欢
      • 2017-06-08
      • 2015-12-14
      • 2020-01-23
      • 1970-01-01
      • 2019-04-21
      • 2019-03-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多