【问题标题】:Apache beam : Programatically create partitioned tablesApache Beam:以编程方式创建分区表
【发布时间】:2017-10-31 13:07:38
【问题描述】:

我正在编写一个从 Pubsub 读取消息并将其存储到 BigQuery 的云数据流。我想使用分区表(按日期)并且我使用与消息关联的Timestamp 来确定消息应该进入哪个分区。以下是我的代码:

      BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            private static final long serialVersionUID = 1L;

            @Override
              public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                log.info("Row value : {}", value.getValue());
                Instant timestamp = value.getTimestamp();
                String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
                TableDestination td = new TableDestination(
                    "<project>:<dataset>.<table>" + "$" + partition, null);
                log.info("Table Destination : {}", td);
                return td;
              }
          })            
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)         
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
    .withSchema(tableSchema);

部署数据流时,我可以在 Stackdriver 中看到日志语句,但是消息没有插入到 BigQuery 表中,并且出现以下错误:

Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity:  "WARNING"  

所以,它看起来无法创建表,导致插入失败。我是否需要更改数据流定义才能使其正常工作?如果没有,还有其他方法可以通过编程方式创建分区表吗?

我正在使用 Apache Beam 2.0.0。

【问题讨论】:

    标签: google-bigquery partitioning google-cloud-dataflow apache-beam


    【解决方案1】:

    这是a bug in BigQueryIO,它已在 Beam 2.2 中修复。您可以使用 Beam 的快照版本,或等到 2.2 版最终确定(发布过程目前正在进行中)。

    【讨论】:

    • 感谢您的更新。几个后续问题(抱歉):1. 有没有其他方法可以在数据流管道中创建表(除了使用 apache beam)和 2. 我们预计 beam 2.2 什么时候发布?
    • 您可以直接使用 BigQuery API cloud.google.com/bigquery/docs/reference/libraries 创建表。 Beam 发布是一个 Apache 社区进程,因此不能有硬性保证,但我认为它似乎可能会在接下来的一两周内发生。您可以关注线程lists.apache.org/thread.html/…
    • 感谢您的意见@jkff。
    • 将库升级到 2.2.0。现在我们使用相同的代码收到以下错误:Cannot read partition information from a table that is not partitioned:。那么,它看起来像是在创建没有分区的表?
    • 能否分享错误的完整堆栈跟踪和数据流作业 ID?
    猜你喜欢
    • 2014-03-09
    • 1970-01-01
    • 2015-12-05
    • 1970-01-01
    • 2013-06-28
    • 2023-03-23
    • 2015-02-25
    • 1970-01-01
    相关资源
    最近更新 更多