【发布时间】:2021-03-25 06:48:28
【问题描述】:
尝试在 Apache Beam (Python) 中编写一个管道,该管道将从 GCP 存储桶中读取输入文件,应用转换然后写入 BigQuery。
以下是 Apache Beam 管道的摘录:
import logging
import apache_beam as beam
import argparse
import csv
import json
from apache_beam.io.gcp.internal.clients import bigquery
def build_argument_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=True,
help='Input file to process.')
parser.add_argument(
'--project',
dest='project',
required=True,
help='Project ID.'
)
parser.add_argument(
'--datasetId',
dest='datasetId',
required=True,
help='BigQuery dataset ID.'
)
parser.add_argument(
'--tableId',
dest='tableId',
required=True,
help='BigQuery table ID.'
)
return parser
def create_pipeline_options(pipeline_arguments):
pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_arguments)
pipeline_options.view_as(beam.options.pipeline_options.SetupOptions).save_main_session = True
return pipeline_options
def run(argv=None):
parser = build_argument_parser()
known_args, pipeline_args = parser.parse_known_args(argv)
table_spec = beam.io.gcp.internal.clients.bigquery.TableReference(
projectId=known_args.project,
datasetId=known_args.datasetId,
tableId=known_args.tableId
)
table_schema = {
'fields': [
...
]
}
with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
input_data = p | "Read input csv" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
input_data_as_json = input_data | "Transforming to json" >> (
beam.ParDo(
TransformCsvToJson(build_field_mapping_names())
)
).with_outputs('error', 'success')
input_data_as_json.success | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
additional_bq_parameters={'timePartitioning': {
'type': 'MONTH'
}}
)
if __name__ == '__main__':
run()
这是执行管道的脚本:
#!/usr/bin/env bash
python ../pipelines/pipeline.py \
--input "gs://storage/input_file.csv" \
--runner DataFlowRunner \
--project "project_name" \
--datasetId "dataset_name" \
--tableId "table_name" \
--region europe-west2 \
--staging_location "gs://storage/staging" \
--temp_location "gs://storage/temp"
这是我得到的错误:
/bin/zsh path/utils/execute_pipeline.sh
Traceback (most recent call last):
File "../pipelines/pipeline.py", line 138, in <module>
run()
File "../pipelines/pipeline.py", line 117, in run
with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
File "/environment_path/lib/python3.8/site-packages/apache_beam/pipeline.py", line 202, in __init__
raise ValueError(
ValueError: Pipeline has validations errors:
Missing required option: project.
- 当我使用 DirectRunner 时,我没有收到此错误。
- 当我使用 DataflowRunner 时,管道无需写入 BigQuery 即可工作。
- 当我使用 DataflowRunner 并对项目、datasetId 和 tableId 进行硬编码时,它可以正常工作。
更新
找到了罪魁祸首,虽然不知道为什么会这样......
修改以下作品
def run(argv=None):
parser = build_argument_parser()
known_args, pipeline_args = parser.parse_known_args(argv)
#this bit below is new
pipeline_args.extend([
'--project=' + known_args.projectId
])
当我打印 known_args 和 pipeline_args 时,“项目”没有出现。虽然我不确定为什么它没有从执行脚本中获取值。
【问题讨论】:
-
您使用的是什么版本的 Beam?
-
@Pablo Beam 版本 = 2.25
-
@Pablo 编辑了原始帖子并添加了一些额外的发现。
标签: python google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io