【发布时间】:2020-12-15 01:04:16
【问题描述】:
我正在运行一个包含大量任务(大于 5000)的 dag,它在检查 dag 运行后失败并出现以下错误。
[2020-12-15 00:43:53,765] {scheduler_job.py:166} 错误 - 出现异常!传播中...
奇怪的是,如果任务数为 1000 或 2000 或更少,dag 会成功运行。
知道这里发生了什么吗?不知道为什么相同的任务条目再次被重新插入到 mysql 表中。
Airflow 设置为本地执行器,mysql 数据库作为元存储。
下面是dag代码
main_dag_id = 'myDag'
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
main_dag_id,
schedule_interval='0 3,9 * * *',
catchup=False,
max_active_runs=1,
default_args=args
)
def start(*args, **kwargs):
//something
def end(*args, **kwargs):
//something
def doSomeWork(name, index, *args, **kwargs):
//something
starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
for index in range(5000):
dynamicTask1 = PythonOperator(
task_id='raw_view_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['raw_view', index])
starting_task.set_downstream(dynamicTask1)
dynamicTask1.set_downstream(ending_task)
starting_task.set_downstream(ending_task)
完整的日志
2020-12-14 23:00:11,272] {scheduler_job.py:1284} INFO - Processing myDag
[2020-12-14 23:00:11,286] {scheduler_job.py:759} INFO - Examining DAG run <DagRun myDag @ 2020-12-14 23:00:09.937961+00:00: manual__2020-12-14T23:00:09.937961+00:00, externally triggered: True>
[2020-12-14 23:00:12,686] {scheduler_job.py:166} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1264, in _execute_context
cursor, statement, parameters, context
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 148, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 279, in executemany
r = self._query(qs)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 371, in _query
rowcount = self._do_query(q)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 335, in _do_query
db.query(q)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'start-myDag-2020-12-14 23:00:09.937961' for key 'PRIMARY'")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 158, in _run_file_processor
pickle_dags)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1611, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1295, in _process_dags
self._process_task_instances(dag, tis_out)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 780, in _process_task_instances
run.verify_integrity(session=session)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/models/dagrun.py", line 400, in verify_integrity
session.commit()
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2523, in flush
self._flush(objects)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush
transaction.rollback(_capture_exception=True)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__
exc_value, with_traceback=exc_tb,
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush
flush_context.execute()
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1020, in execute
return meth(self, multiparams, params)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_clauseelement
distilled_params,
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1324, in _execute_context
e, statement, parameters, cursor, context
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1518, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1518, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1264, in _execute_context
cursor, statement, parameters, context
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 148, in do_executemany
rowcount = cursor.executemany(statement, parameters)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 279, in executemany
r = self._query(qs)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 371, in _query
rowcount = self._do_query(q)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 335, in _do_query
db.query(q)
File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'start-myDag-2020-12-14 23:00:09.937961' for key 'PRIMARY'")
[SQL: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, pool_slots, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)]
[parameters: (('start', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 5002, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('end', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 1, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_0', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_1', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_2', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_3', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_4', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_5', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.') ... displaying 10 of 5002 total bound parameter sets ... ('raw_view_4998', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_4999', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'))]
(Background on this error at: http://sqlalche.me/e/gkpj)
[2020-12-14 23:00:16,205] {scheduler_job.py:154} INFO - Started process (PID=2024199) to work on /dags/myDag.py
[2020-12-14 23:00:16,207] {scheduler_job.py:1562} INFO - Processing file /dags/myDag.py for tasks to queue
[2020-12-14 23:00:16,208] {logging_mixin.py:112} INFO - [2020-12-14 23:00:16,208] {dagbag.py:396} INFO - Filling up the DagBag from /dags/myDag.py
【问题讨论】:
标签: mysql airflow airflow-scheduler