【问题标题】:Loading csv with non-standard separator into BQ将带有非标准分隔符的 csv 加载到 BQ 中
【发布时间】:2019-07-05 10:38:39
【问题描述】:

假设我在 csv 文件中有以下数据:

'"tom","jones","hello,\nMy name is tom"\x01\n"sarah","smith","hello"\x01\n'

行终止符是\x01\n。是否可以将其直接加载到 GCS 中(无需先对其进行预格式化)?我的思考过程是:

  • 使用非标准分隔符(例如\x00ff)将其加载到 CSV 中,以便在一行中获取所有数据。
  • 然后执行基本的 DML 以“清理”数据并重新格式化。

但是,当我们有连续行时,我们会遇到一个问题,因为 BQ 不“支持”(如果您想这样称呼它)行排序。这是我的数据现在在 BQ 中的样子:

我们可以看到行排序不起作用,因此不可能“重新组合数据”,例如,使用 UDF 来获得我们需要的正确 csv 数据。

这里还有其他可能的方法吗?澄清一下,我希望通过 BigQuery 转换已在 GCS 上的 CSV 文件,而无需在加载到 BQ 之前将该文件下载到单独的服务器进行处理。


作为参考,这是我目前正在使用的代码:

# /tmp/schema_external_nonstandard_csv.json
{
  "schema": {
    "fields": [
      {
        "name": "data",
        "type": "STRING"
      }
    ]
  },
  "sourceFormat": "CSV",
  "sourceUris": [
    "gs://XY-bq/ns.csv"
  ],
  "csvOptions": {
    "fieldDelimiter": "\u00ff",
    "quote": ""
  },
  "maxBadRecords": 1000000
}

$ bq mk --external_table_definition=/tmp/schema_external_nonstandard_csv.json datadocs-163219:bqtesting.ns
$ bq query --nouse_legacy_sql 'CREATE TABLE `XY-163219.bqtesting.ns1` AS select * from `XY-163219.bqtesting.ns`'

【问题讨论】:

  • 确保回答您的第一个问题。
  • @ElliottBrossard 你能澄清一下吗?我回答了。
  • 啊,对不起哈哈。我没有意识到你已经回答了你自己的问题:) 忽略。
  • 您的文件有多大/多小?
  • @MikhailBerlyant 它们可以是从几行到 100GB 左右的任何大小。真的,这取决于。

标签: csv google-cloud-platform google-bigquery google-cloud-dataflow


【解决方案1】:

我想到了一些纯粹的 BigQuery 解决方案:

  1. 用 bq 指定分隔符。这不适用于here 记录的这个用例

“分隔符可以是任何 ISO-8859-1 单字节字符。”

  1. 使用REGEXP_REPLACE,我得到的最好结果是单行,里面有一个换行符:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
  REGEXP_REPLACE(data, r"\\x01\\n", "\n") AS data
FROM
  test.separator_external

  1. 根据上一点,可以使用“hack”将行分解为不同的行(参见答案here)。但是,需要注意的是,您需要先验地知道拆分的数量,在这种情况下,它是不一致的。

  2. 您已经在使用但添加了一个行号以便可以将数据合并回来。这可能可行,但确保保留行顺序也可能很复杂。

如果我们考虑使用其他 GCP 产品作为 GCS 和 BigQuery 之间的中间人,我们可以找到其他有趣的解决方案:

  1. 使用 Dataprep,它在后台运行 Dataflow。有一个替换转换 (docs),可以通过编程方式生成和调用数据流模板。

  2. 使用数据流。我实际上用这个gist 测试了这个解决方案,它可以工作: 我认为可以通过创建一个模板(自定义分隔符可以是一个输入参数)很好地扩展这个模板,并在每次使用 Cloud Functions 将文件上传到 GCS 时触发它(NoOps 解决方案)。

简单地说,我们使用TextIO.read().from(file) 从文件中读取记录,其中file 是GCS 路径(启动作业时提供inputoutput 参数)。我们可以使用withDelimiter() 额外使用一个虚拟分隔符来避免冲突(这里我们再次限制为单个字节,所以我们不能直接传递真实的分隔符)。然后对于每一行,我们用c.element().split("\\\\x01\\\\n") 分隔实际分隔符。请注意,我们需要转义已经转义的字符(您可以在正常加载的 JSON 查询结果中验证这一点),因此需要四个反斜杠。

p
    .apply("GetMessages", TextIO.read().from(file))
        .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          for (String line : c.element().split("\\\\x01\\\\n")) {
            if (!line.isEmpty()) {
              c.output(line);
            }
          }
        }
    }))

结果:

请记住,正如 @hlagos 所指出的,由于 BigQuery 中的行限制或分配给 Dataflow 中单个工作人员的不可拆分步骤,您可能会遇到非常大的单行 CSV 问题。

【讨论】:

  • 这是一个很好的方法,感谢分享。我已经测试了使用 CloudFunctions 作为一种无服务器方法进行处理,但是 ~10m 的超时限制使得它无法用于更大的文件。我会说它可以始终适用于较小的源(可能低于 10 或 100M 行),但对于较大的源,它可能会超时。
  • 我的想法是仅使用 Cloud Function 来触发 Dataflow 作业(使用模板),而不是进行实际处理以避免超过超时。查看示例here
猜你喜欢
  • 2016-03-25
  • 1970-01-01
  • 2015-05-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多