【发布时间】:2020-01-13 13:12:04
【问题描述】:
我尝试在 Google Cloud Dataflow 中运行由 Google Cloud Coomposer 中的 DAG 触发的 Apache Beam 管道 (Python)。
我的 dags 文件夹在各个 GCS 存储桶中的结构如下:
/dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline
setup.py 非常基本,但根据 Apache Beam 文档和 SO 上的答案:
import setuptools
setuptools.setup(setuptools.find_packages())
在 DAG 文件 (dataflow.py) 中,我设置了 setup_file 选项并将其传递给 Dataflow:
default_dag_args = {
... ,
'dataflow_default_options': {
... ,
'runner': 'DataflowRunner',
'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
}
}
在管道文件(pipeline.py)中我尝试使用
from my_modules import commons
但这失败了。 Google Cloud Composer (Apache Airflow) 中的日志显示:
gcp_dataflow_hook.py:132} WARNING - b' File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n from my_modules import commons\n ^\nSyntaxError: invalid syntax'
setup.py 文件背后的基本思想记录在 here
另外,关于 SO 的类似问题对我有帮助:
Google Dataflow - Failed to import custom python modules
Dataflow/apache beam: manage custom module dependencies
我实际上想知道为什么我的管道失败并出现 Syntax Error 而不是 module not found 类型的错误...
【问题讨论】:
-
看起来像语法问题。你有没有正确检查空格,因为它是 python ?
标签: airflow google-cloud-dataflow apache-beam google-cloud-composer