【问题标题】:Strategy for loading data into BigQuery and Google cloud Storage from local disk将数据从本地磁盘加载到 BigQuery 和 Google Cloud Storage 的策略
【发布时间】:2016-12-17 09:02:47
【问题描述】:

我从 teradata 提取的本地磁盘中有 2 年大小约为 300GB 的组合数据。我必须将相同的数据加载到谷歌云存储和 BigQuery 表中。

谷歌云存储中的最终数据应按天以压缩格式分隔(每天的文件应为 gz 格式的单个文件)。 我还必须将 BigQuery 中的数据加载到按天分区的表中,即每天的数据应该存储在一个分区中。

我先将 2 年的合并数据加载到谷歌存储。然后尝试使用谷歌数据流通过使用数据流中的分区概念将数据进行日常隔离并将其加载到谷歌云存储(仅供参考数据流分区与大查询分区不同)。但是数据流不允许创建 730 个分区(2 年),因为它达到了 413 Request Entity Too Large(管道的序列化 JSON 表示的大小超过了允许的限制”)。

所以我运行了两次数据流作业,过滤了每年的数据。 它过滤了每一年的数据,并将其写入谷歌云存储中的单独文件,但由于数据流目前无法写入压缩文件,因此无法对其进行压缩。

看到第一种方法失败,我想到了使用上述数据流中的分区从组合数据中过滤一年的数据,并将其直接写入 BigQuery,然后以压缩格式将其导出到谷歌存储。这个过程会重复两次。 但是在这种方法中,我一次无法写入超过 45 天的数据,因为我反复遇到 java.lang.OutOfMemoryError: Java heap space issue。所以这个攻略也失败了

在制定以压缩格式和 BigQuery 以日期方式隔离迁移到谷歌存储的策略方面有什么帮助吗?

【问题讨论】:

  • (我删除了一个针对不同问题的答案,抱歉!)

标签: google-bigquery google-cloud-storage google-cloud-platform google-cloud-dataflow


【解决方案1】:

目前,对结果进行分区是生成多个输出文件/表的最佳方式。您可能会遇到这样一个事实,即每次写入都会为上传分配一个缓冲区,因此如果您有一个分区,然后是 N 次写入,那么就有 N 个缓冲区。

有两种策略可以完成这项工作。

  1. 您可以使用GcsOptions 中的uploadBufferSizeBytes 选项来减小上传缓冲区的大小。请注意,这可能会减慢上传速度,因为需要更频繁地刷新缓冲区。
  2. 您可以对分区后的每个PCollection 应用Reshuffle 操作。这将限制同时运行的并发 BigQuery 接收器的数量,因此将分配更少的缓冲区。

例如,您可以执行以下操作:

PCollection<Data> allData = ...;
PCollectionList<Data> partitions = allData.apply(Partition.of(...));

// Assuming the partitioning function has produced numDays partitions,
// and those can be mapped back to the day in some meaningful way:
for (int i = 0; i < numDays; i++) {
  String outputName = nameFor(i); // compute the output name
  partitions.get(i)
    .apply("Write_" + outputName, ReshuffleAndWrite(outputName));
}

这利用了这两个辅助 PTransforms:

private static class Reshuffle<T>
  extends PTransform<PCollection<T>, PCollection<T>> {
  @Override
  public PCollection<T> apply(PCollection<T> in) {
    return in
      .apply("Random Key", WithKeys.of(
          new SerializableFunction<T, Integer>() {
            @Override
            public Integer apply(Data value) {
              return ThreadLocalRandom.current().nextInt();
            }
          }))
      .apply("Shuffle", GroupByKey.<Integer, T>create())
      .apply("Remove Key", Values.create());
  }
}

private static class ReshuffleAndWrite 
  extends PTransform<PCollection<Data>, PDone> {

  private final String outputName;
  public ReshuffleAndWrite(String outputName) {
    this.outputName = outputName;
  }

  @Override
  public PDone apply(PCollection<Data> in) {
    return in
      .apply("Reshuffle", new Reshuffle<Data>())
      .apply("Write", BigQueryIO.Write.to(tableNameFor(outputName)
        .withSchema(schema)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
  }
}

【讨论】:

    【解决方案2】:

    让我们看看这是否有帮助?

    步骤+伪代码

    1 - 将组合数据 (300GB) 上传到 BigQuery 到 CombinedData 表

    2 - 分年(成本 1x2x300GB = 600GB)

    SELECT * FROM CombinedData WHERE year = year1 -> write to DataY1 table  
    SELECT * FROM CombinedData WHERE year = year2 -> write to DataY2 table  
    

    3 - 拆分为 6 个月(成本 2x2x150GB = 600GB)

    SELECT * FROM DataY1 WHERE month in (1,2,3,4,5,6) -> write to DataY1H1 table
    SELECT * FROM DataY1 WHERE month in (7,8,9,10,11,12) -> write to DataY1H2 table
    SELECT * FROM DataY2 WHERE month in (1,2,3,4,5,6) -> write to DataY2H1 table
    SELECT * FROM DataY2 WHERE month in (7,8,9,10,11,12) -> write to DataY2H2 table
    

    4 - 拆分为 3 个月(成本 4x2x75GB = 600GB)

    SELECT * FROM DataY1H1 WHERE month in (1,2,3) -> write to DataY1Q1 table
    SELECT * FROM DataY1H1 WHERE month in (4,5,6) -> write to DataY1Q2 table
    SELECT * FROM DataY1H2 WHERE month in (7,8,9) -> write to DataY1Q3 table
    SELECT * FROM DataY1H2 WHERE month in (10,11,12) -> write to DataY1Q4 table
    
    SELECT * FROM DataY2H1 WHERE month in (1,2,3) -> write to DataY2Q1 table
    SELECT * FROM DataY2H1 WHERE month in (4,5,6) -> write to DataY2Q2 table
    SELECT * FROM DataY2H2 WHERE month in (7,8,9) -> write to DataY2Q3 table
    SELECT * FROM DataY2H2 WHERE month in (10,11,12) -> write to DataY2Q4 table
    

    5 - 将每个季度分成 1 个月和 2 个月(成本 8x2x37.5GB = 600GB)

    SELECT * FROM DataY1Q1 WHERE month = 1 -> write to DataY1M01 table
    SELECT * FROM DataY1Q1 WHERE month in (2,3) -> write to DataY1M02-03 table
    SELECT * FROM DataY1Q2 WHERE month = 4 -> write to DataY1M04 table
    SELECT * FROM DataY1Q2 WHERE month in (5,6) -> write to DataY1M05-06 table  
    

    其余 Y(1/2)Q(1-4) 表相同

    6 - 将所有双月表拆分为单独的月表(成本 8x2x25GB = 400GB)

    SELECT * FROM DataY1M002-03 WHERE month = 2 -> write to DataY1M02 table
    SELECT * FROM DataY1M002-03 WHERE month = 3 -> write to DataY1M03 table
    SELECT * FROM DataY1M005-06 WHERE month = 5 -> write to DataY1M05 table
    SELECT * FROM DataY1M005-06 WHERE month = 6 -> write to DataY1M06 table
    

    其余 Y(1/2)M(XX-YY) 表相同

    7 - 最后你有 24 个每月牌桌,现在我希望你所面临的限制将消失,这样你就可以继续你的计划 - 比如说第二种方法 - 进一步拆分每日牌桌

    我认为,在成本方面,这是最优化的方法,最终查询成本是
    (假设计费层级 1)

    4x600GB + 400GB = 2800GB = $14 
    

    当然别忘了删除中间表

    注意:我对这个计划不满意 - 但如果无法将原始文件拆分为 BigQuery 之外的每日块 - 这会有所帮助

    【讨论】:

    • 基本上我在 teradata 中有 2 年的数据,我必须将这些数据加载到谷歌存储和 bigquery 中。但是为每天的数据运行 teradata 并行传输器并不是最佳的,我觉得它很慢,因为我们必须启动和停止从 teradata 提取数据 730 次。所以我一次提取了所有数据,现在我不知道如何进一步进行。你的方法会有所帮助。如果您有什么想法,请提供帮助:)
    • 目前我能想到的就这些了。试试吧
    • 另一方面,如果您已经在 teradata 中分区了这些数据 - 听起来最好不要将它组合起来,然后再拆分回来,因为知道它有多混乱。您可能想探索更多从 teradata 获取数据的选项 - 这可以带来回报
    猜你喜欢
    • 2018-03-28
    • 1970-01-01
    • 1970-01-01
    • 2013-05-24
    • 1970-01-01
    • 2017-11-13
    • 1970-01-01
    • 2018-04-30
    相关资源
    最近更新 更多