【发布时间】:2018-05-29 19:48:02
【问题描述】:
在this Stackoverflow post 的帮助下,我刚刚制作了一个程序(帖子中显示的那个),当文件被放置在 S3 存储桶中时,我正在运行的 DAG 中的一个任务被触发,然后我使用执行一些工作Bash 运算符。一旦完成,尽管 DAG 不再处于运行状态,而是进入成功状态,如果我想让它拾取另一个文件,我需要清除所有“过去”、“未来”、“上游”、“下游的活动。我想让这个程序始终运行,并且只要将新文件放入 S3 存储桶中,程序就会启动任务。
我是否可以继续使用 S3KeySenor 来执行此操作,或者我是否需要想办法设置一个 External Trigger 来运行我的 DAG?到目前为止,如果我的 S3KeySensor 只运行一次,它就毫无意义。
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1',
bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
retries=1,
dag=dag)
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='foobar-bucket',
s3_conn_id='s3://foobar-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
t2.set_upstream(t1)
我想知道这是否不可能,因为它不会是有向无环图,而是会有一个重复 sensor -> t1 -> t2 -> sensor -> t1 -> 的循环t2 -> 传感器 -> ...不断重复。
更新:
我的用例非常简单,只要将新文件放在指定的 AWS S3 存储桶中,我就希望触发 DAG 并开始执行各种任务。这些任务将执行诸如实例化新的 AWS EMR 集群、从 AWS S3 存储桶中提取文件、执行一些 AWS EMR 活动,然后关闭 AWS EMR 集群之类的事情。从那里,DAG 将回到等待状态,等待新文件到达 AWS S3 存储桶,然后无限期地重复该过程。
【问题讨论】:
-
我在下面发布了一个答案,围绕用例做了一些假设。如果有任何不清楚的地方或我误解了您想要实现的目标,请告诉我。
-
@TaylorEdmiston 我认为您非常了解我想要实现的目标,但我也更新了帖子以包含我的用例。谢谢。
-
这是有道理的。我认为非常频繁运行的 DAG 可能更容易上手,但外部触发的 DAG 运行对于这个用例来说听起来像是一个更好、更灵活的设置。
-
对于那些想知道为什么它对他们不起作用的人:导入已过时/不推荐使用。试试这个 1.
pip install apache-airflow-providers-amazon2.from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
标签: boto3 airflow airflow-scheduler