【问题标题】:Overwrite some partitions of a partitioned table Bigquery覆盖分区表 Bigquery 的一些分区
【发布时间】:2018-11-22 01:36:50
【问题描述】:

我目前正在尝试开发 Dataflow 管道以替换分区表的某些分区。我有一个自定义分区字段,它是一个日期。我的管道的输入是一个日期可能不同的文件。

我开发了一个管道:

    PipelineOptionsFactory.register(BigQueryOptions.class);
    BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> rows =  p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
            .apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));



    ValueProvider<String>  projectId = options.getProjectId();
    ValueProvider<String> datasetId = options.getDatasetId();
    ValueProvider<String> tableId = options.getTableId();
    ValueProvider<String> partitionField = options.getPartitionField();
    ValueProvider<String> columnNames = options.getColumnNames();
    ValueProvider<String> types = options.getTypes();

    rows.apply("Write to BQ", BigQueryIO.writeTableRows()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withCustomGcsTempLocation(options.getGCSTempLocation())
            .to(new DynamicDestinations<TableRow, String>() {

                @Override
                public String getDestination(ValueInSingleWindow<TableRow> element) {

                    TableRow date = element.getValue();

                    String partitionDestination = (String) date.get(partitionField.get());

                    SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
                    SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");

                    try {

                        partitionDestination = to.format(from.parse(partitionDestination));
                        LOG.info("Table destination "+partitionDestination);
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;

                    } catch(ParseException e){
                        e.printStackTrace();
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
                    }
                }

                @Override
                public TableDestination getTable(String destination) {

                    TimePartitioning timePartitioning = new TimePartitioning();
                    timePartitioning.setField(partitionField.get());
                    timePartitioning.setType("DAY");
                    timePartitioning.setRequirePartitionFilter(true);

                    TableDestination tableDestination  = new TableDestination(destination, null, timePartitioning);

                    LOG.info(tableDestination.toString());

                    return tableDestination;

                }

                @Override
                public TableSchema getSchema(String destination) {

                        return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
                }
            })
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    p.run();
}

当我在本地触发管道时,它成功替换了输入文件中日期的分区。然而,当在 Google Cloud Dataflow 上部署并使用完全相同的参数运行模板时,它会截断所有数据,最后我的表中只有我想要上传的文件。

你知道为什么会有这样的差异吗?

谢谢!

【问题讨论】:

  • 本地运行和云端运行应该没有区别。你确定你所描述的事情正在发生吗?
  • 你好,格雷厄姆,谢谢你的回答,是的,我确定:我从我的梁代码生成了一个模板,当我使用完全相同的参数运行它时,它会覆盖我所有的分区。
  • @GrahamPolley,我也尝试使用 Dataflow 运行器启动管道(而不是生成模板),它仍然会覆盖所有分区
  • 可能是因为您使用的是.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 而不是.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND
  • 您好@HarisNadeem,用例是这样的:我有一个分区表,假设我在以下日期有 3 个分区:2018-05-01、2018-05-02, 2018-05-03。在 t+1,我有一个输入文件,其中包含 2018-05-02、2018-05-03 的数据。我想要做的是替换这些日期的当前分区,并保持 2018-05-01 不变。当我执行使用直接运行器开发的管道时,一切正常。但是,当我使用 DataflowRunner 触发它时,它会覆盖所有分区,而我的输出中只有 2 个分区。

标签: java google-bigquery apache-beam


【解决方案1】:

您将 Bi​​gQueryIO.Write.CreateDisposition 指定为 CREATE_IF_NEEDED,这与 BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE 配对,因此即使表存在,也可以重新创建。这就是您看到您的桌子被替换的原因。

有关详细信息,请参阅此文档 [1]。

[1]https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write.CreateDisposition#CREATE_IF_NEEDED

【讨论】:

    猜你喜欢
    • 2020-05-31
    • 1970-01-01
    • 2019-09-15
    • 2018-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多