【问题标题】:Writing nested schema to BigQuery from Dataflow (Python)从 Dataflow (Python) 将嵌套架构写入 BigQuery
【发布时间】:2018-07-22 07:39:36
【问题描述】:

我有一个要写入 BigQuery 的 Dataflow 作业。它适用于非嵌套模式,但不适用于嵌套模式。

这是我的数据流管道:

pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)

  schema = 'url: STRING,' \
           'ua: STRING,' \
           'method: STRING,' \
           'man: RECORD,' \
           'man.ip: RECORD,' \
           'man.ip.cc: STRING,' \
           'man.ip.city: STRING,' \
           'man.ip.as: INTEGER,' \
           'man.ip.country: STRING,' \
           'man.res: RECORD,' \
           'man.res.ip_dom: STRING'

  first = p | 'read' >> ReadFromText(wordcount_options.input)
  second = (first
            | 'process' >> (beam.ParDo(processFunction()))
            | 'write' >> beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=schema)
  )

我使用以下架构创建了 BigQuery 表:

[
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ua",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "method",
    "type": "STRING"
  },
  {
    "mode": "REPEATED",
    "name": "man",
    "type": "RECORD",
    "fields":
      [
        {
          "mode": "REPEATED",
          "name": "ip",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "cc",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "city",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "as",
                "type": "INTEGER"
              },
              {
                "mode": "NULLABLE",
                "name": "country",
                "type": "STRING"
              }
            ]
        },
        {
          "mode": "REPEATED",
          "name": "res",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "ip_dom",
                "type": "STRING"
              }
            ]
        }
      ]
  }
]

我收到以下错误:

BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
 Message: Invalid value for: url is not a valid value
 HTTP Code: 400

问题 有人可以指导我吗?我究竟做错了什么? 另外,如果有更好的方法来遍历所有嵌套模式并写入 BigQuery,请提出建议?

其他信息 我的数据文件:

{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}

【问题讨论】:

  • 首先,为什么在ReadFromText之后直接使用WriteToBigQuery呢? ReadFromText 返回 PCollection 的字符串,这些字符串与 WriteToBigQuery 的输入不兼容(它需要字典)
  • 对不起@marcin-zablocki。我在两者之间有一个过程功能。编辑并添加到我的问题中。 process 函数返回一个字典。
  • 您的字典是否包含正确的数据类型?您是否正确解析数据?似乎是一个微不足道的类型错误
  • 它适用于非嵌套表。我刚刚尝试了两列。我已经添加了带有处理功能的数据集。你能猜出可能是什么错误吗?

标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

您的代码的问题是您尝试使用 嵌套字段,同时指定 BigQuery 表架构 作为字符串,即 不支持。 为了将嵌套记录从 Apache Beam 推送到 BigQuery,您需要创建 TableSchema 对象,即使用内置解析器:

from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)

您需要在此处将模式作为 JSON 字符串传递,您可以在终端中使用以下命令获取它(我假设您已安装 gcloud tools):

bq --project=your-gcp-project-name --format=json show your.table.name > schema.json

在 Python 中使用如下:

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))

然后在您的管道中:

 beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=table_schema)

您还可以查看显示手动创建 TableSchema 对象的示例: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py

这是(来自链接的示例):

from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)

# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)

table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)

然后在beam.io.WriteToBigQuery 中使用table_schema 变量。

【讨论】:

  • Marcin,我需要将your_bigquery_json_schema 作为字符串传递吗?或者只是将 json 模式复制粘贴到他们的.像这样——table_schema = parse_table_schema_from_json([{......}])?对不起,我有点困惑。你能补充一下你的答案吗?
  • 谢谢马尔辛!我将对此进行测试并在此处更新。目前,使用您提供的链接来创建手动 TableSchema 对象作品。但我会尝试使用parse_table_schema_from_json
  • 这太棒了!它可能夸大了对 JSON 的需求——你可以只提供模式作为嵌套的 python 结构而不是 JSON / 使用 parse 函数
  • 嗨,Maximilan,您可以将其添加为答案吗?
  • 我的模式有多个嵌套层。我得到了 typeError:列表索引必须是整数或切片,而不是 str。来自 parse_table_schema_from_json。有什么建议吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-02
  • 1970-01-01
  • 1970-01-01
  • 2017-09-27
  • 1970-01-01
相关资源
最近更新 更多