【问题标题】:How to run a task if a previous one fails in Apache Airflow如果前一个任务在 Apache Airflow 中失败,如何运行任务
【发布时间】:2017-06-22 21:35:12
【问题描述】:

我需要做以下事情:

检查服务器是否启动,如果是,我检查该服务器中的 Spark 集群是否启动,如果它关闭,我尝试启动它,如果它已经启动,我继续运行我的 Spark 作业。

我想创建一个任务来检查 Spark 集群是否启动(也许尝试运行一个简单的 Spark 作业)。如果失败,我将启动“启动 Spark 集群”任务。

我正在使用 Airflow,但没有找到触发任务的方法,以防前一个任务失败。除此之外,我需要检查前一个以防它成功,以便它会分支到 Spark 作业任务并跳过“启动 Spark 集群”任务。

如果您能提供一些样品,那就太好了。我尝试使用 trigger_rule 和分支运算符,但到目前为止一无所获。可能是因为网上关于它们的代码示例太少了。

谢谢

【问题讨论】:

  • 将触发规则设置为 all_failed 或 one_failed 应该完全符合您的要求。例如,只需在下游创建两个任务并将一个设置为成功,一个设置为两个失败。顺便提一句。如果您说您正在启动一个集群,我假设您使用云提供商。应该有更好的方法来检查集群是否启动,例如使用云供应商命令行工具。如果您使用的是 GCP,甚至还有启动 dataproc 集群并提交作业的气流操作员。
  • 谢谢 Gindele,我都试试你的方法。关于集群,我们有一个独立的 spark 集群在我们自己的服务器上运行,没有云。无论如何,如果您有任何建议,我们都非常欢迎。

标签: python triggers controls airflow


【解决方案1】:

您可以使用 PythonBranhOperator。 这是一个代码示例供您参考。

VALID_TEST_OUTPUT = 'valid-output'
INVALID_TEST_OUTPUT = 'invalid-test-output'


test_job=PythonOperator(
    ## your task
)

def select_branch_after_test(**kwargs):
    '''
    :return: returns the task_id to proceed with.
    '''
    if com.getoutput('hadoop fs -ls /user/praveen/.test_out/ | grep file1'):
        follow_previous_job = VALID_TEST_OUTPUT
    else:
        follow_previous_job = INVALID_TEST_OUTPUT
    return follow_previous_job


# branching based on successful test output
branching = BranchPythonOperator(task_id=BRANCH_TEST_OUTPUT,
                                 python_callable=select_branch_after_test,
                                 dag=dag)
branching.set_upstream(test_job)

valid_test_output = PythonOperator(
    task_id=VALID_TEST_OUTPUT,
    python_callable=lambda **kwargs: logging.info("test genereated valid output, proceeding with other steps"),
    provide_context=True,
    dag=dag)
valid_test_output.set_upstream(branching)

end_task = PythonOperator(
    task_id=END,
    python_callable=pipeline_run_time,
    provide_context=True,
    trigger_rule='one_success',  ## Important
    dag=dag)

invalid_test_output = PythonOperator(
    task_id=INVALID_TEST_OUTPUT,
    python_callable=lambda **kwargs: logging.info("test did not create valid output, completeing branching dag to other path"),
    provide_context=True,
    dag=dag)
invalid_test_output.set_upstream(branching)

valid_test_output.set_downstream(end_task)
invalid_test_output.set_downstream(end_task)

`

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-12
    • 2021-06-05
    相关资源
    最近更新 更多