【问题标题】:Google Dataflow: Import custom Python moduleGoogle Dataflow:导入自定义 Python 模块
【发布时间】:2020-01-13 13:12:04
【问题描述】:

我尝试在 Google Cloud Dataflow 中运行由 Google Cloud Coomposer 中的 DAG 触发的 Apache Beam 管道 (Python)。

我的 dags 文件夹在各个 GCS 存储桶中的结构如下:

/dags/
  dataflow.py <- DAG
  dataflow/
    pipeline.py <- pipeline
    setup.py
    my_modules/
      __init__.py
      commons.py <- the module I want to import in the pipeline

setup.py 非常基本,但根据 Apache Beam 文档和 SO 上的答案:

import setuptools

setuptools.setup(setuptools.find_packages())

在 DAG 文件 (dataflow.py) 中,我设置了 setup_file 选项并将其传递给 Dataflow:

default_dag_args = {
    ... ,
    'dataflow_default_options': {
        ... ,
        'runner': 'DataflowRunner',
        'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
    }
}

在管道文件(pipeline.py)中我尝试使用

from my_modules import commons

但这失败了。 Google Cloud Composer (Apache Airflow) 中的日志显示:

gcp_dataflow_hook.py:132} WARNING - b'  File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n    from my_modules import commons\n           ^\nSyntaxError: invalid syntax'

setup.py 文件背后的基本思想记录在 here

另外,关于 SO 的类似问题对我有帮助:

Google Dataflow - Failed to import custom python modules

Dataflow/apache beam: manage custom module dependencies

我实际上想知道为什么我的管道失败并出现 Syntax Error 而不是 module not found 类型的错误...

【问题讨论】:

  • 看起来像语法问题。你有没有正确检查空格,因为它是 python ?

标签: airflow google-cloud-dataflow apache-beam google-cloud-composer


【解决方案1】:

我试图重现您的问题,然后尝试解决它,因此我创建了与您已有的相同的文件夹结构:

/dags/
  dataflow.py
  dataflow/
     pipeline.py -> pipeline
     setup.py
     my_modules/
        __init__.py
        common.py

因此,为了使其正常工作,我所做的更改是将这些文件夹复制到实例正在运行的代码能够找到它的位置,例如在实例的 /tmp/ 文件夹中。

所以,我的 DAG 应该是这样的:

1 - 我首先声明我的论点:

default_args = {
   'start_date': datetime(xxxx, x, x),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'dataflow_default_options': {
       'project': '<project>',
       'region': '<region>',
       'stagingLocation': 'gs://<bucket>/stage',
       'tempLocation': 'gs://<bucket>/temp',
       'setup_file': <setup.py>,
       'runner': 'DataflowRunner'
   }
} 

2- 之后,我创建了 DAG,在运行 Dataflow 任务之前,我将上面创建的整个文件夹目录复制到实例 Task t1/tmp/ 文件夹中,然后运行管道从/tmp/目录Task t2:

with DAG(
    'composer_df',
     default_args=default_args,
     description='datflow dag',
     schedule_interval="xxxx") as dag:

     def copy_dependencies():
          process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*', 
          '/tmp/'])
          process.communicate()


     t1 = python_operator.PythonOperator(
        task_id='copy_dependencies',
        python_callable=copy_dependencies,
        provide_context=False
     )


     t2 = DataFlowPythonOperator(task_id="composer_dataflow", 
          py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')

     t1 >> t2

这就是我创建 DAG 文件 dataflow.py 的方式,然后在 pipeline.py 中导入的包如下:

from my_modules import commons

它应该可以正常工作,因为文件夹目录对于 VM 来说是可以理解的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-20
    • 1970-01-01
    • 2018-03-24
    • 2014-06-13
    • 2019-03-14
    • 2021-07-31
    相关资源
    最近更新 更多