【问题标题】:Using Apache Beam GCP DataflowRunner to write to BigQuery (Python)使用 Apache Beam GCP DataflowRunner 写入 BigQuery (Python)
【发布时间】:2021-03-25 06:48:28
【问题描述】:

尝试在 Apache Beam (Python) 中编写一个管道,该管道将从 GCP 存储桶中读取输入文件,应用转换然后写入 BigQuery。

以下是 Apache Beam 管道的摘录:

import logging
import apache_beam as beam
import argparse
import csv
import json
from apache_beam.io.gcp.internal.clients import bigquery

def build_argument_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=True,
        help='Input file to process.')
    parser.add_argument(
        '--project',
        dest='project',
        required=True,
        help='Project ID.'
    )
    parser.add_argument(
        '--datasetId',
        dest='datasetId',
        required=True,
        help='BigQuery dataset ID.'
    )
    parser.add_argument(
        '--tableId',
        dest='tableId',
        required=True,
        help='BigQuery table ID.'
    )
    return parser


def create_pipeline_options(pipeline_arguments):
    pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_arguments)
    pipeline_options.view_as(beam.options.pipeline_options.SetupOptions).save_main_session = True
    return pipeline_options


def run(argv=None):
    parser = build_argument_parser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    table_spec = beam.io.gcp.internal.clients.bigquery.TableReference(
        projectId=known_args.project,
        datasetId=known_args.datasetId,
        tableId=known_args.tableId
    )

    table_schema = {
        'fields': [
            ...
        ]
    }

    with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
        input_data = p | "Read input csv" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)

        input_data_as_json = input_data | "Transforming to json" >> (
            beam.ParDo(
                TransformCsvToJson(build_field_mapping_names())
            )
        ).with_outputs('error', 'success')

        input_data_as_json.success | beam.io.WriteToBigQuery(
            table_spec,
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            additional_bq_parameters={'timePartitioning': {
                'type': 'MONTH'
            }}
        )


if __name__ == '__main__':
    run()

这是执行管道的脚本:

    #!/usr/bin/env bash

python ../pipelines/pipeline.py \
    --input "gs://storage/input_file.csv" \
    --runner DataFlowRunner \
    --project "project_name" \
    --datasetId "dataset_name" \
    --tableId "table_name" \
    --region europe-west2 \
    --staging_location "gs://storage/staging" \
    --temp_location "gs://storage/temp"

这是我得到的错误:

    /bin/zsh path/utils/execute_pipeline.sh
Traceback (most recent call last):
  File "../pipelines/pipeline.py", line 138, in <module>
    run()
  File "../pipelines/pipeline.py", line 117, in run
    with beam.Pipeline(options=create_pipeline_options(pipeline_args)) as p:
  File "/environment_path/lib/python3.8/site-packages/apache_beam/pipeline.py", line 202, in __init__
    raise ValueError(
ValueError: Pipeline has validations errors: 
Missing required option: project.
  1. 当我使用 DirectRunner 时,我没有收到此错误。
  2. 当我使用 DataflowRunner 时,管道无需写入 BigQuery 即可工作。
  3. 当我使用 DataflowRunner 并对项目、datasetId 和 tableId 进行硬编码时,它可以正常工作。

更新

找到了罪魁祸首,虽然不知道为什么会这样......

修改以下作品

def run(argv=None):
    parser = build_argument_parser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    #this bit below is new
    pipeline_args.extend([
        '--project=' + known_args.projectId
    ])

当我打印 known_args 和 pipeline_args 时,“项目”没有出现。虽然我不确定为什么它没有从执行脚本中获取值。

【问题讨论】:

  • 您使用的是什么版本的 Beam?
  • @Pablo Beam 版本 = 2.25
  • @Pablo 编辑了原始帖子并添加了一些额外的发现。

标签: python google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io


【解决方案1】:

known_args, pipeline_args = parser.parse_known_args(argv) 行将参数分成两部分,解析器知道的那些参数 (known_args) 和不知道的那些。因为您有一个名为project 的标志,所以它的值被放入known_args 并且永远不会进入pipeline_args。您可以确保解析器中的参数与预期通过pipeline_args 传递的参数不相交,或者您可以在解析后扩充pipeline_args

【讨论】:

  • 您能否解释一下“您可以确保解析器中的参数与预期通过 pipeline_args 传递的参数不相交”的意思?我忘了在上面提到,如果我删除“parser.add_argument('--project'”部分,project 仍然没有被 pipeline_args 或 known_args 拾取
  • 使用 parser.add_argument('--project'.... 删除,这是 known_args 和 pipeline_args 中的内容: known_args: Namespace(datasetId='datasetId', input='gs://input /input_file.csv', projectId='projectId', tableId='tableId') pipeline_args: ['--runner', 'DataflowRunner', '--region', 'europe-west2', '--temp_location', ' gs://temp_location/']
  • 你还在传递 --project 作为标志吗?
  • 是的,我还在 shell 脚本中传递项目。
猜你喜欢
  • 2022-06-27
  • 1970-01-01
  • 2021-10-10
  • 2021-05-20
  • 1970-01-01
  • 2021-05-08
  • 2020-12-21
  • 1970-01-01
  • 2020-05-01
相关资源
最近更新 更多