【发布时间】:2017-09-08 20:12:41
【问题描述】:
我试图通过在 Airflow 中使用 celeryExecutor 来运行以下简单的工作流程:
default_args = {
'depends_on_past': False,
'start_date': datetime.now(),
}
dag = DAG('HelloWorld', default_args=default_args, schedule_interval=None)
default_args=default_args)
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Hello World from Task 1"; sleep 0.1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Hello World from Task 2"; sleep 0.2',
dag=dag)
t2.set_upstream(t1)
但是,task_1 和 task_2 之间总是有大约 5 秒的延迟。以下是气流.cfg 快照:
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 0.1
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 1
看起来 celery 是导致延迟的原因,但如果属实,如何从气流配置或 API 设置 celery 工作人员心跳间隔(或池化速率)?
【问题讨论】:
-
将 'start_date' 设置为 datetime.now() 可能会出现一些问题,请参阅“使用 datetime.now() 的 start_date 会导致不可预知的行为,并且您的 DAG 永远不会启动。建议减去时间跨度以强制调度程序识别 start_date。***"
-
您能否更具体地说明
start_date应该是什么?请注意,我的schedule_interval已设置为None -
对不起,我看错了信息,看起来这是意料之中的,请看这里 - groups.google.com/forum/#!topic/airbnb_airflow/dNskpaOYNQo