【发布时间】:2018-07-15 13:44:08
【问题描述】:
我想使用 Cloud Dataflow、PubSub 和 Bigquery 将 tableRow 写入 Pubsub 消息,然后将它们写入 Bigquery。
我希望表名、项目 ID 和数据集 ID 是动态的。
我在互联网上看到以下代码,我无法理解如何传递数据行参数。
public void PubSub(String projectId , String datasetId,String tableId,String topicId)
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
Pipeline pipeline = Pipeline.create(dataflowOptions);
PCollection<TableRow> input = pipeline.apply(PubsubIO.Read.topic(createTopic(projectId,topicId).getName()).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
input.apply(BigQueryIO.Write.to(getTableReference(projectId,datasetId, tableId)).withSchema(getSchema()));
pipeline.run();
}
private static TableReference getTableReference(String projectId , String datasetId,String tableId) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(projectId);
tableRef.setDatasetId(datasetId);
tableRef.setTableId(tableId);
return tableRef;
}
提前致谢, 加尔
【问题讨论】:
标签: java google-bigquery google-cloud-dataflow publish-subscribe google-cloud-pubsub