【发布时间】:2021-08-16 14:36:21
【问题描述】:
我正在为 Amazon AirFlow 1.10.12 创建一个插件和 dag 结构。我是按照文档做的:
dags:
- aws_from_redshift_to_s3.py
plugins:
- __init__.py
- from_redshift_to_s3_plugin.py
- operators:
-- __init__.py
-- aws_from_redshift_to_s3_operator.py
aws_from_redshift_to_s3_operator.py:
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_hook import AwsHook
class FromRedshiftToS3TransferOperator(BaseOperator):
pass
from_redshift_to_s3_plugin.py:
from airflow.plugins_manager import AirflowPlugin
from operators.aws_from_redshift_to_s3_operator import FromRedshiftToS3TransferOperator
class FromRedShiftToS3Plugin(AirflowPlugin):
name = 'from_redshift_to_s3_plugin'
operators = [FromRedshiftToS3TransferOperator]
В самом ДАГе подключаю так:
从 operator.from_redshift_to_s3_plugin 导入 FromRedshiftToS3TransferOperator При попытке активировать ДАГ в Amazon AirFlow 1.10.12 получаю ошибку:没有名为“运营商”的模块
【问题讨论】:
-
尝试以这种方式导入您的模块:
from plugins.operators.aws_from_redshift_to_s3_operator import FromRedshiftToS3TransferOperator -
正在尝试 - 不工作(损坏的 DAG:[/usr/local/airflow/dags/aws_from_redshift_to_s3.py] 没有名为“插件”的模块