【问题标题】:Creating/Writing to Parititoned BigQuery table via Google Cloud Dataflow通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表
【发布时间】:2016-11-02 00:42:48
【问题描述】:

我想利用时间分区表的新 BigQuery 功能,但不确定当前是否可以在 1.6 版的 Dataflow SDK 中实现。

查看BigQuery JSON API,创建天分区表需要传入一个

"timePartitioning": { "type": "DAY" }

选项,但 com.google.cloud.dataflow.sdk.io.BigQueryIO 接口只允许指定 TableReference。

我想也许我可以预先创建表,然后通过 BigQueryIO.Write.toTableReference lambda 潜入分区装饰器......?是否还有其他人通过 Dataflow 创建/写入分区表成功?

这似乎与设置当前不可用的 table expiration time 类似。

【问题讨论】:

    标签: google-bigquery google-cloud-dataflow apache-beam-io


    【解决方案1】:

    我相信当您不使用流式传输时应该可以使用分区装饰器。我们正在积极致力于通过流媒体支持分区装饰器。如果您今天在非流媒体模式下发现任何错误,请告诉我们。

    【讨论】:

    • 嗨@Pavan,我们正在使用 BlockingDataflowPipelineRunner 并以批处理模式运行,但 BigQueryIO.Write 步骤失败,400 Bad Request"Table decorators cannot be used with streaming insert." 有没有办法不使用流式写入 BigQuery?我认为它实际上会进行批量加载。是否有支持流媒体模式的时间表?
    • 啊,看起来像一个表引用函数导致它进入流模式:(
    • 嗨@Pavan,在流式传输期间支持表格装饰器的任何时间表?
    • 希望在本月底之前
    【解决方案2】:

    正如 Pavan 所说,使用 Dataflow 写入分区表绝对是可能的。您使用的是在流模式还是批处理模式下运行的DataflowPipelineRunner

    您提出的解决方案应该可行。具体来说,如果您预先创建了一个设置了日期分区的表,那么您可以使用BigQueryIO.Write.toTableReference lambda 来写入日期分区。例如:

    /**
     * A Joda-time formatter that prints a date in format like {@code "20160101"}.
     * Threadsafe.
     */
    private static final DateTimeFormatter FORMATTER =
        DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
    
    // This code generates a valid BigQuery partition name:
    Instant instant = Instant.now(); // any Joda instant in a reasonable time range
    String baseTableName = "project:dataset.table"; // a valid BigQuery table name
    String partitionName =
        String.format("%s$%s", baseTableName, FORMATTER.print(instant));
    

    【讨论】:

    • 这个方法很好,但是它只允许在管道之外用参数控制日期戳。如果我们想使用数据本身的时间戳来按日期拆分它们,然后写入相应的表中怎么办?
    • @nembleton :如果元素有时间戳,您可以使用窗口将它们映射到每日窗口中。修改此代码:PCollection<Integer> windowedItems = items.apply( Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(10))));。然后读取窗口的 TableSpecFun 会将元素映射到正确的日期。代码来自FixedWindows javadoc
    • 感谢@DanHalperin,这几乎就是我正在做的事情,包括窗口,但使用.apply(Window.into(CalendarWindows.days(1))) 唯一的问题是因为数据可以位于不同的时区,我们希望 BigQuery 在原始时区,我们在早期的 PTransform 中通过 outputWithTimestamp 调用做了一些有趣的事情
    • @JulianV.Modesto 是对的,如果提供了表引用,1.6 SDK 会切换到以流模式写入 BigQuery .. 这还不允许表装饰器
    • 使用BigQuery的流式写入API,我相信是正确的。
    【解决方案3】:

    我采用的方法(也适用于流模式):

    • 为传入记录定义自定义窗口
    • 将窗口转换为表/分区名称

      p.apply(PubsubIO.Read
                  .subscription(subscription)
                  .withCoder(TableRowJsonCoder.of())
              )
              .apply(Window.into(new TablePartitionWindowFn()) )
              .apply(BigQueryIO.Write
                             .to(new DayPartitionFunc(dataset, table))
                             .withSchema(schema)
                             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
              );
      

    根据传入的数据设置窗口,可以忽略End Instant,因为开始值用于设置分区:

    public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
    
    private IntervalWindow assignWindow(AssignContext context) {
        TableRow source = (TableRow) context.element();
        String dttm_str = (String) source.get("DTTM");
    
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
    
        Instant start_point = Instant.parse(dttm_str,formatter);
        Instant end_point = start_point.withDurationAdded(1000, 1);
    
        return new IntervalWindow(start_point, end_point);
    };
    
    @Override
    public Coder<IntervalWindow> windowCoder() {
        return IntervalWindow.getCoder();
    }
    
    @Override
    public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
        return Arrays.asList(assignWindow(c));
    }
    
    @Override
    public boolean isCompatible(WindowFn<?, ?> other) {
        return false;
    }
    
    @Override
    public IntervalWindow getSideInputWindow(BoundedWindow window) {
        if (window instanceof GlobalWindow) {
            throw new IllegalArgumentException(
                    "Attempted to get side input window for GlobalWindow from non-global WindowFn");
        }
        return null;
    }
    

    动态设置表分区:

    public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
    
    String destination = "";
    
    public DayPartitionFunc(String dataset, String table) {
        this.destination = dataset + "." + table+ "$";
    }
    
    @Override
    public String apply(BoundedWindow boundedWindow) {
        // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
        String dayString = DateTimeFormat.forPattern("yyyyMMdd")
                                         .withZone(DateTimeZone.UTC)
                                         .print(((IntervalWindow) boundedWindow).start());
        return destination + dayString;
    }}
    

    有没有更好的方法来达到同样的结果?

    【讨论】:

    • 您使用哪个版本的 Apache Beam 库来设置上述数据流?
    【解决方案4】:

    Apache Beam 2.0 版支持分片 BigQuery 输出表out of the box

    【讨论】:

      【解决方案5】:

      如果您以table_name_YYYYMMDD 格式传递表名,那么BigQuery 会将其视为分片表,可以模拟分区表功能。 参考文档:https://cloud.google.com/bigquery/docs/partitioned-tables

      【讨论】:

      • 错了! BigQuery 会将其视为常规表!唯一能让您认为 BigQuery 以某种方式特别对待此类表的原因是因为 BigQuery UI 将此类表组合在一个条目 table_name (NN) 下,但除此之外,它仅取决于用户了解此类命名背后的含义,并且表未分区基于名称
      • @MikhailBerlyant,是的,它不会是分区表,但它会创建一个可以模拟分区表功能的分表。这是最后的手段,直到梁提供将分区列作为参数传递的选项。
      • 所以,至少你更新答案的方式现在没有那么错误:o)
      • 你也需要使用 $ 符号
      【解决方案6】:

      我已通过数据流将数据写入 bigquery 分区表。如果该分区中的数据已经存在,那么这些写入是动态的,那么我可以附加到它或覆盖它。

      我已经用 Python 编写了代码。这是对 bigquery 的批处理模式写入操作。

      client = bigquery.Client(project=projectName)
      dataset_ref = client.dataset(datasetName)
      table_ref = dataset_ref.table(bqTableName)       
      job_config = bigquery.LoadJobConfig()
      job_config.skip_leading_rows = skipLeadingRows
      job_config.source_format = bigquery.SourceFormat.CSV
      if tableExists(client, table_ref):            
          job_config.autodetect = autoDetect
          previous_rows = client.get_table(table_ref).num_rows
          #assert previous_rows > 0
          if allowJaggedRows is True:
              job_config.allowJaggedRows = True
          if allowFieldAddition is True:
              job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
          if isPartitioned is True:
              job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
          if schemaList is not None:
              job_config.schema = schemaList            
          job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
      else:            
          job_config.autodetect = autoDetect
          job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
          job_config.schema = schemaList
          if isPartitioned is True:             
              job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
          if schemaList is not None:
              table = bigquery.Table(table_ref, schema=schemaList)            
      load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)        
      assert load_job.job_type == 'load'
      load_job.result()       
      assert load_job.state == 'DONE'
      

      效果很好。

      【讨论】:

        猜你喜欢
        • 2015-12-14
        • 1970-01-01
        • 2019-03-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-01
        相关资源
        最近更新 更多