【发布时间】:2019-05-20 11:08:30
【问题描述】:
我刚刚在 Python 3 和 Composer 映像版本 composer-1.4.0-airflow-1.10.0 上设置了 Cloud Composer 环境。否则所有设置都是“库存”;即没有配置覆盖。
我正在尝试测试一个非常简单的 DAG。它在我的本地 Airflow 服务器上运行没有问题,但在 Cloud Composer 上,Web 服务器的任务信息视图显示消息 Dependencies Blocking Task From Getting Scheduled
依赖是Unknown,原因如下:
All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_dag_runs_per_dag, non_pooled_task_slot_count
If this task instance does not start soon please contact your Airflow administrator for assistance.
无论任务是按计划运行,还是当我在 Web 服务器中手动触发它时都会发生这种情况(我在执行此操作之前将所有任务实例设置为成功,以避免延迟)。我试过resetting the scheduler in kubernetes as per this answer,但任务仍然停留在计划中。
另外,我注意到在我的本地实例上(在不同的 Docker 容器上运行服务器、工作程序和调度程序),Task Instances 视图中的Hostname 列已填充,但在 Cloud Composer 上却没有。
这是我正在运行的 DAG:
from datetime import datetime, timedelta
import random
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'queue': 'airflow',
'start_date': datetime.today() - timedelta(days=2),
'schedule_interval': None,
'retries': 2,
'retry_delay': timedelta(seconds=15),
'priority_weight': 10,
}
example_dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def always_succeed():
pass
always_succeed_operator = PythonOperator(
dag=example_dag,
python_callable=always_succeed,
task_id='always_succeed'
)
def might_fail():
return 1 / random.randint(0, 1)
might_fail_operator = PythonOperator(
dag=example_dag, python_callable=might_fail, task_id='might_fail'
)
might_fail_operator.set_upstream(always_succeed_operator)
【问题讨论】:
标签: google-cloud-platform airflow airflow-scheduler google-cloud-composer