【问题标题】:Airflow S3KeySensor - How to make it continue runningAirflow S3KeySensor - 如何让它继续运行
【发布时间】:2018-05-29 19:48:02
【问题描述】:

this Stackoverflow post 的帮助下,我刚刚制作了一个程序(帖子中显示的那个),当文件被放置在 S3 存储桶中时,我正在运行的 DAG 中的一个任务被触发,然后我使用执行一些工作Bash 运算符。一旦完成,尽管 DAG 不再处于运行状态,而是进入成功状态,如果我想让它拾取另一个文件,我需要清除所有“过去”、“未来”、“上游”、“下游的活动。我想让这个程序始终运行,并且只要将新文件放入 S3 存储桶中,程序就会启动任务。

我是否可以继续使用 S3KeySenor 来执行此操作,或者我是否需要想办法设置一个 External Trigger 来运行我的 DAG?到目前为止,如果我的 S3KeySensor 只运行一次,它就毫无意义。

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

我想知道这是否不可能,因为它不会是有向无环图,而是会有一个重复 sensor -> t1 -> t2 -> sensor -> t1 -> 的循环t2 -> 传感器 -> ...不断重复

更新:

我的用例非常简单,只要将新文件放在指定的 AWS S3 存储桶中,我就希望触发 DAG 并开始执行各种任务。这些任务将执行诸如实例化新的 AWS EMR 集群、从 AWS S3 存储桶中提取文件、执行一些 AWS EMR 活动,然后关闭 AWS EMR 集群之类的事情。从那里,DAG 将回到等待状态,等待新文件到达 AWS S3 存储桶,然后无限期地重复该过程。

【问题讨论】:

  • 我在下面发布了一个答案,围绕用例做了一些假设。如果有任何不清楚的地方或我误解了您想要实现的目标,请告诉我。
  • @TaylorEdmiston 我认为您非常了解我想要实现的目标,但我也更新了帖子以包含我的用例。谢谢。
  • 这是有道理的。我认为非常频繁运行的 DAG 可能更容易上手,但外部触发的 DAG 运行对于这个用例来说听起来像是一个更好、更灵活的设置。
  • 对于那些想知道为什么它对他们不起作用的人:导入已过时/不推荐使用。试试这个 1.pip install apache-airflow-providers-amazon 2.from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor

标签: boto3 airflow airflow-scheduler


【解决方案1】:

在 Airflow 中,没有一个概念可以映射到始终运行的 DAG。如果适合您的用例,您可以非常频繁地运行 DAG,例如每 1 到 5 分钟一次。

这里主要是 S3KeySensor 检查直到它检测到第一个文件存在于密钥的通配符路径中(或超时),然后它运行。但是当第二个、第三个或第四个文件着陆时,S3 传感器将已经完成了该 DAG 运行的运行。在下一次 DAG 运行之前,它不会被安排再次运行。 (你描述的循环思想大致相当于调度程序在创建 DAG 运行时所做的事情,但不是永远。)

外部触发器听起来绝对是您的用例的最佳方法,无论该触发器来自 Airflow CLI 的 trigger_dag 命令 ($ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

或通过 REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

两者都转身调用通用(实验性)API 中的trigger_dag 函数:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

例如,您可以设置一个 AWS Lambda 函数,当文件到达 S3 时调用该函数,该函数运行触发 DAG 调用。

【讨论】:

  • 好的,谢谢。这就是我所期望的答案。我可以很容易地设置一个 Lambda 函数来像你说的那样进行 REST API 调用;如果我使用 AWS Data Pipeline,这也是我必须做的,我每次都必须使用 Lambda 函数激活它。
  • 这种方法在 2020 年还是一样,还是现在有更好的方法来处理这个问题?
  • @Devender 是的,目前方法仍然相同。我不相信有任何计划从根本上改变它。
  • @TaylorEdmiston 我有一个 DAG 计划每分钟运行一次,第一个任务是 S3KeySensor,它会在 59 秒内超时。我希望 DAG 的一个新实例,即传感器任务的一个新实例,每分钟都会运行一次,但它只运行一次......这应该工作吗?我做错了什么?
  • @ajendrex 你好。是的,您描述的方法在正常运行 DAG 时应该可以工作。如果您通过trigger_dag 调用,则只会创建一个 DAG 运行,而与 DAG 的时间表无关。
【解决方案2】:

另一种方法是使用 S3 触发器 aws lambda,它将使用 api 调用 DAG

s3 事件 -> aws lambda -> Airflow api

设置 S3 通知以触发 lambda

https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

气流API

https://airflow.apache.org/docs/apache-airflow/stable/rest-api-ref.html

【讨论】:

  • 点评来源: 欢迎提供解决方案的链接,但请确保您的答案在没有它的情况下有用:add context around the link 这样您的其他用户就会知道它是什么以及它为什么存在,然后引用在目标页面不可用的情况下,您链接到的页面中最相关的部分。答案只不过是一个链接may be deleted。见:How do I write a good answer?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-02-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多