【发布时间】:2021-10-07 09:48:00
【问题描述】:
这个问题与this one 非常相似——但使用了python API,而且几年后——由于解决方案不完整,我认为最好打开一个新问题。
我有一堆 ndjson 文件,每天在 GCS 上生成,我想将它们加载到 BQ 表中。
来自file_1.json的几行示例:
{"a": 1, "b": 2, "c": [1,2,4], "d": "string"}
{"a": 1, "c": [2, 4], "d": "some_string"}
{"a": 1, "e": 4}
file_2.json 的示例:
{"a": 4, "e": 6, "f": {"g": 7, "h": "str"}}
{"a": 1, "c": [2, 4], "f": {"g": 5}}
大多数字段是可选的,理论上每个文件中的每个文件记录都可以包含字段的任意组合(目前从大约 50 个选项中选择,但将来会改变和增长)。同名字段的内容应该(抛开任何错误)包含相同的数据类型。
我想将一组文件(某一天的所有文件)加载到 BQ 表中,其架构将由文件中所有字段的联合组成,其中一行没有键的情况下为 NULL 值对应一个字段。
我希望在没有需要维护的架构的情况下这样做。
我目前的尝试:
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
autodetect=True,
ignore_unknown_values=True,
schema_update_option="ALLOW_FIELD_ADDITION",
source_format="NEWLINE_DELIMITED_JSON"
)
uri = "gs://my-bucket/test/*.json"
load_job = client.load_table_from_uri(
uri,
table_id,
location="EU",
job_config=job_config,
)
load_job.result()
使用autodetect=True 来避免显式指定架构,但由于自动检测从单个文件中扫描多达 500 行 - 最初可能不会创建某些字段。
我希望schema_update_option="ALLOW_FIELD_ADDITION" (ref here) 能够满足我的需求,但它不起作用。
我尝试过的另一个选项:
# Instead of wildcard, get blob list and convert to uris
blobs = storage_client.get_bucket(BUCKET).list_blobs(prefix=FOLDER)
uris = [f"gs://{BUCKET}/{blob.name}" for blob in blobs if blob.name.endswith(".json")]
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND", #Will append file by file
create_disposition="CREATE_IF_NEEDED",
autodetect=True,
schema_update_option="ALLOW_FIELD_ADDITION",
source_format="NEWLINE_DELIMITED_JSON"
)
for uri in uris:
load_job = client.load_table_from_uri(
uri,
table_id,
location="EU",
job_config=job_config,
)
load_job.result()
我想这是效率较低的 - 但无论如何它也不起作用 - 得到这个错误:
BadRequest: 400 Provided Schema does not match Table my-bucket:test.test_diff_schema. Cannot add fields (field: f)
再一次,我想schema_update_option="ALLOW_FIELD_ADDITION" 会避免 - 但它似乎只适用于预定义的架构,而不是自动检测。
欢迎提出任何想法,在此先感谢!
【问题讨论】:
标签: python json google-bigquery google-cloud-storage