【问题标题】:Writing and importing custom plugins in Airflow在 Airflow 中编写和导入自定义插件
【发布时间】:2019-08-19 14:00:40
【问题描述】:

这实际上是两个问题合二为一。

我的AIRFLOW_HOME 的结构类似于

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils

我一直在 https://github.com/airflow-plugins 关注 astronomer.io 的示例。我的自定义operators 使用我的自定义hooks,并且所有导入都相对于顶级文件夹plugins

# my_operator.py
from plugins.hooks.my_hook import MyHook

但是,当我尝试将整个存储库移动到插件文件夹中时,运行 airflow list_dags 后出现导入错误,提示找不到 plugins

我阅读了一些关于它的内容,显然 Airflow 将插件加载到其核心模块中,因此它们可以像导入一样被导入

# my_operator.py
from airflow.hooks.my_hook import MyHook

所以我将所有导入更改为直接从airflow.plugin_type 读取。不过,我收到另一个导入错误,这次说找不到my_hook。我每次都重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我查看了类似问题中提出的解决方案,但它们也不起作用。

官方文档也显示了https://airflow.apache.org/plugins.html 扩展AirflowPlugin 类的这种方式,但我不确定这个“接口”应该放在哪里。我也更喜欢拖放选项。

最后,我的代码库显然是 plugins 文件夹本身没有意义,但如果我将它们分开,测试变得不方便。每次在我的钩子/操作上运行单元测试时,我是否必须修改我的 Airflow 配置以指向我的存储库?测试自定义插件的最佳做法是什么?

【问题讨论】:

    标签: airflow


    【解决方案1】:

    我通过反复试验发现了这一点。这是我的AIRFLOW_HOME 文件夹的最终结构

    airflow 
    +-- dags 
    +-- plugins
        +-- __init__.py
        +-- plugin_name.py
        +-- hooks
            +-- __init__.py
            +-- my_hook.py 
            +-- another_hook.py 
        +-- operators
            +-- __init__.py
            +-- my_operator.py 
            +-- another_operator.py 
        +-- sensors 
        +-- utils
    

    plugin_name.py 中,我扩展了AirflowPlugin

    # plugin_name.py
    
    from airflow.plugins_manager import AirflowPlugin
    from hooks.my_hook import *
    from operators.my_operator import *
    from utils.my_utils import *
    # etc
    
    class PluginName(AirflowPlugin):
    
        name = 'plugin_name'
    
        hooks = [MyHook]
        operators = [MyOperator]
        macros = [my_util_func]
    

    在使用我的自定义钩子的自定义运算符中,我像导入它们一样

    # my_operator.py
    
    from hooks.my_hook import MyHook
    

    然后在我的 DAG 文件中,我可以这样做

    # sample_dag.py
    
    from airflow.operators.plugin_name import MyOperator
    

    需要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。

    这也有助于测试,因为自定义类中的导入与文件夹 plugins 中的子模块相关。我想知道是否可以省略 plugins 中的 __init__.py 文件,但由于一切正常,我没有尝试这样做。

    【讨论】:

    • 当我看到你自己的回复时,我说 EUREKA !但是不...按照您的步骤仍然有问题。这是我的 SO 问题(我会在您的解决方案之后尝试)。 stackoverflow.com/questions/58237543/…可以帮忙吗?
    • plugin_name.py 的代码中有一个错字(至少对于当前版本的 Airflow)。它应该来自airflow.pluginS_manager
    • 我得到“损坏的 DAG:[/opt/airflow/dags/my_dag.py] 没有名为 'airflow.hooks.my_plugin' 的模块”
    • @Julio 使用您自己的设置和文件夹结构开始一个单独的问题可能对您更有帮助。
    猜你喜欢
    • 2021-12-05
    • 2020-01-28
    • 2021-08-19
    • 1970-01-01
    • 1970-01-01
    • 2017-10-10
    • 1970-01-01
    • 1970-01-01
    • 2017-01-21
    相关资源
    最近更新 更多