【发布时间】:2021-05-02 01:20:54
【问题描述】:
我刚刚开始使用 Airflow-Pentaho-Plugin。我在 Pentaho 数据集成服务器上创建了一个转换,并创建了从 Airflow 到 PDI 的连接。我正在使用 PanOperator 和 KitchenOperator 分别触发 Pentaho 转换和工作。在 DAG 中创建了一个依赖项,如下所示:Transformation >> JOB。即使转换失败,转换的状态在作业图上始终为绿色,并且 JOB 也会被触发。我可以在日志上看到转换失败。我期望应该报告失败并且不运行后续的下游。关于我遗漏了什么或做错了什么的任何建议?我的 DAG 如下所示:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
DAG_NAME = "pdi_example_2"
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['abc@abc.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='30 0 * * *') as dag:
trans = PanOperator(
queue="pdi_2",
task_id="pdi_example_2",
directory={},
file="/path/sample.ktr",
trans={},
params={},
dag=dag)
job = KitchenOperator(
queue="pdi_3",
task_id="average_spent",
directory={},
job={},
file="/path/sample.kjb",
params={}, # Date in yyyy-mm-dd format
dag=dag)
trans >> job
【问题讨论】:
-
没有官方的 Pentacho 提供商,因此您需要具体了解问题所在并分享 Pentacho 集成代码。