【发布时间】:2021-05-02 20:01:43
【问题描述】:
带有运行时参数的数据流管道使用 DirectRunner 运行良好,但在切换到 DataflowRunner 时遇到参数错误。
File "/home/user/miniconda3/lib/python3.8/site-packages/apache_beam/options/pipeline_options.py", line 124, in add_value_provider_argument
self.add_argument(*args, **kwargs)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1386, in add_argument
return self._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1749, in _add_action
self._optionals._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1590, in _add_action
action = super(_ArgumentGroup, self)._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1400, in _add_action
self._check_conflict(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1539, in _check_conflict
conflict_handler(action, confl_optionals)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1548, in _handle_conflict_error
raise ArgumentError(action, message % conflict_string)
argparse.ArgumentError: argument --bucket_input: conflicting option string: --bucket_input
这是参数的定义和调用方式
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--bucket_input',
default="device-file-dev",
help='Raw device file bucket')
pipeline = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
_ = (
pipeline
| 'Initiate dataflow' >> beam.Create(["Start"])
| 'Create P collection with file paths' >> beam.ParDo(
CreateGcsPCol(input_bucket=custom_options.bucket_input)
)
请注意,这只发生在 DataflowRunner 上。任何人都知道如何解决它?非常感谢。
【问题讨论】:
-
错误告诉你
parser已经有一个带有''- -bucket_input'` 标志的参数。您不应该再次尝试添加它。 -
感谢@hpaulj。然而,它与数据流的 DirectRunner 一起运行良好——那里的所有参数都被正确解析和使用。知道为什么只有 DataflowRunner 会失败吗?
-
您的代码看起来正确,所以我认为问题出在其他地方。你是如何执行脚本的?笔记本?命令行?还有什么?
-
@Cubez 你是对的。该错误是由通过相对路径导入本地 python 子模块引起的。使用 DirectRunner,相对路径有效,但不适用于 DataflowRunner。通过同时安装数据流管道模块和子模块,并从已安装的子模块导入而不是使用相对路径来解决问题。
-
@YunChen,您能详细说明您的解决方案吗?安装数据流管道模块和子模块并从子模块导入而不是使用相对路径是什么意思?您是否使用 --setup_file 参数将某些内容传递给脚本?任何建议都会很棒!
标签: python python-3.x google-cloud-dataflow apache-beam argparse