【发布时间】:2020-03-31 17:11:18
【问题描述】:
我有一个管道,它必须在最后的步骤中在 BigQuery 上写入两条记录,我真的不知道为什么它似乎什么也没插入。 我没有错误,表存在并且已经包含记录,确实我必须使用TRUNCATE/INSERT模式。
有人可以帮我弄清楚为什么它没有像我预期的那样工作吗?
这是我的管道:
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table']))
| 'Get Files from Server' >> beam.Map(import_file)
| 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket)
| 'Set record update' >> beam.Map(set_last_step)
| 'Update table' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=config['ENVIRONMENT']['configuration_table'],
write_disposition='WRITE_TRUNCATE',
schema=('folder:STRING, last_file:STRING')
)
)
与
def set_last_step(file_list):
logging.info(msg='UPDATE CONFIGURATION TABLE - working on: ' + str(file_list))
folder = ''
if 'original' in file_list:
if '1951' in file_list:
folder = '1951'
else:
folder = '1952'
dic = {'folder': folder, 'last_file': file_list['original']}
logging.info(msg='UPDATE CONFIGURATION TABLE - no work done, reporting original record: ' + str(dic))
else:
folder = list(file_list.keys())[0]
path = list(file_list.values())[0]
dic = {'folder': folder, 'last_file': path}
logging.info(msg='UPDATE CONFIGURATION TABLE - work done, reporting new record: ' + str(dic))
purge(dir=os.path.join(HOME_PATH, 'download'), pattern=folder+"_")
logging.info(msg='UPDATE CONFIGURATION TABLE - record to be updated: ' + str(dic))
return dic
WriteToBigQuery 阶段的输入记录(显然是“更新表”阶段的输出)是:
{'folder': '1952', 'last_file': '1952_2019120617.log.gz'}
{'folder': '1951', 'last_file': '1951_2019120617.log.gz'}
来自 DataFlow 的调试信息是:
2019-12-06 18:09:36 DEBUG Creating or getting table <TableReference
datasetId: 'MYDATASET'
projectId: 'MYPROJECT'
tableId: 'MYTABLE'> with schema {'fields': [{'name': 'folder', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'last_file', 'type': 'STRING', 'mode': 'NULLABLE'}]}.
2019-12-06 18:09:36 DEBUG Created the table with id MYTABLE
2019-12-06 18:09:36 INFO Created table MYPROJECT.MYDATASET.MYTABLE with schema <TableSchema
fields: [<TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'folder'
type: 'STRING'>, <TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'last_file'
type: 'STRING'>]>. Result: <Table
creationTime: 1575652176727
etag: '0/GXOOeXPCmYsMfgGNxl2Q=='
id: 'MYPROJECT:MYDATASET.MYTABLE'
kind: 'bigquery#table'
lastModifiedTime: 1575652176766
location: 'EU'
numBytes: 0
numLongTermBytes: 0
numRows: 0
schema: <TableSchema
fields: [<TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'folder'
type: 'STRING'>, <TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'last_file'
type: 'STRING'>]>
selfLink: 'https://www.googleapis.com/bigquery/v2/projects/MYPROJECT/datasets/MYDATASET/tables/MYTABLE'
tableReference: <TableReference
datasetId: 'MYDATASET'
projectId: 'MYPROJECT'
tableId: 'MYTABLE'> with schema {'fields': [{'name': 'folder', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'last_file', 'type': 'STRING', 'mode': 'NULLABLE'}]}.
2019-12-06 18:09:36 DEBUG Created the table with id MYTABLE
2019-12-06 18:09:36 INFO Created table MYPROJECT.MYDATASET.MYTABLE with schema <TableSchema
fields: [<TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'folder'
type: 'STRING'>, <TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'last_file'
type: 'STRING'>]>. Result: <Table
creationTime: 1575652176727
etag: '0/GXOOeXPCmYsMfgGNxl2Q=='
id: 'MYPROJECT:MYDATASET.MYTABLE'
kind: 'bigquery#table'
lastModifiedTime: 1575652176766
location: 'EU'
numBytes: 0
numLongTermBytes: 0
numRows: 0
schema: <TableSchema
fields: [<TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'folder'
type: 'STRING'>, <TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'last_file'
type: 'STRING'>]>
selfLink: 'https://www.googleapis.com/bigquery/v2/projects/MYPROJECT/datasets/MYDATASET/tables/MYTABLE'
tableReference: <TableReference
datasetId: 'MYDATASET'
projectId: 'MYPROJECT'
tableId: 'MYTABLE'>
type: 'TABLE'>.
2019-12-06 18:09:36 WARNING Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
2019-12-06 18:12:06 DEBUG Attempting to flush to all destinations. Total buffered: 2
2019-12-06 18:12:06 DEBUG Flushing data to MYPROJECT:MYDATASET.MYTABLE. Total 2 rows.
2019-12-06 18:12:07 DEBUG Passed: True. Errors are []
【问题讨论】:
-
我同意@GuillemXercavins。您如何验证记录未出现在 BigQuery 中?您可以尝试对结果进行查询吗?
-
您好,感谢两位的回答!我知道从流式流中插入的记录可能会在延迟后出现,但我确信我的记录没有插入到 BigQuery 上,因为我运行了一个查询来访问没有结果的数据,我已经等了 2 个小时(流式缓冲区应该90 分钟内刷新)以访问数据,最后我强制刷新缓冲区: CREATE OR REPLACE TABLE MYPROJECT.MYDATASET.MYTABLE AS SELECT * FROM MYPROJECT.MYDATASET.MYTABLE 我错过了什么吗?
-
鉴于日志中的完全限定表名是“MYPROJECT:MYDATASET.MYTABLE”,您的配置似乎没有设置。 “table=config['ENVIRONMENT']['configuration_table']” 工作吗?您可以尝试记录吗?
-
难道不是每次都使用'WRITE_TRUNCATE'来覆盖表数据吗?也许改用“WRITE_APPEND”。
标签: python-3.x google-bigquery etl google-cloud-dataflow apache-beam