【发布时间】:2018-07-22 07:39:36
【问题描述】:
我有一个要写入 BigQuery 的 Dataflow 作业。它适用于非嵌套模式,但不适用于嵌套模式。
这是我的数据流管道:
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
schema = 'url: STRING,' \
'ua: STRING,' \
'method: STRING,' \
'man: RECORD,' \
'man.ip: RECORD,' \
'man.ip.cc: STRING,' \
'man.ip.city: STRING,' \
'man.ip.as: INTEGER,' \
'man.ip.country: STRING,' \
'man.res: RECORD,' \
'man.res.ip_dom: STRING'
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=schema)
)
我使用以下架构创建了 BigQuery 表:
[
{
"mode": "NULLABLE",
"name": "url",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ua",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "method",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "man",
"type": "RECORD",
"fields":
[
{
"mode": "REPEATED",
"name": "ip",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "cc",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "as",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
}
]
},
{
"mode": "REPEATED",
"name": "res",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "ip_dom",
"type": "STRING"
}
]
}
]
}
]
我收到以下错误:
BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
Message: Invalid value for: url is not a valid value
HTTP Code: 400
问题 有人可以指导我吗?我究竟做错了什么? 另外,如果有更好的方法来遍历所有嵌套模式并写入 BigQuery,请提出建议?
其他信息 我的数据文件:
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
【问题讨论】:
-
首先,为什么在ReadFromText之后直接使用WriteToBigQuery呢? ReadFromText 返回 PCollection 的字符串,这些字符串与 WriteToBigQuery 的输入不兼容(它需要字典)
-
对不起@marcin-zablocki。我在两者之间有一个过程功能。编辑并添加到我的问题中。 process 函数返回一个字典。
-
您的字典是否包含正确的数据类型?您是否正确解析数据?似乎是一个微不足道的类型错误
-
它适用于非嵌套表。我刚刚尝试了两列。我已经添加了带有处理功能的数据集。你能猜出可能是什么错误吗?
标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam