【发布时间】:2021-11-12 19:49:33
【问题描述】:
我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。
我的 DAG 如下所示:
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
任务失败并出现错误 - Task exited with return code Negsignal.SIGKILL。 python 脚本在我的本地机器上运行良好,并在 15 分钟内完成。有多个从中提取报告的端点。但是,耗时最长(约 15 分钟)的那个会因此错误而失败,而其他的会成功。
我尝试了很多选项,但似乎都没有。有人可以帮忙吗?
【问题讨论】:
-
Cloud Composer 为您提供了一个监控仪表板。我建议只运行失败的任务,并在此期间检查 Airflow 工作人员的内存和 CPU 压力。这会告诉你需要增加哪些资源。
-
如果我的回答解决了您的问题,请考虑接受并点赞。如果没有,请告诉我,以便我改进答案。
标签: python-3.x google-cloud-platform airflow google-cloud-composer sigkill