【问题标题】:Airflow got error while processing large number of tasks处理大量任务时气流出错
【发布时间】:2020-12-15 01:04:16
【问题描述】:

我正在运行一个包含大量任务(大于 5000)的 dag,它在检查 dag 运行后失败并出现以下错误。

[2020-12-15 00:43:53,765] {scheduler_job.py:166} 错误 - 出现异常!传播中...

奇怪的是,如果任务数为 1000 或 2000 或更少,dag 会成功运行。

知道这里发生了什么吗?不知道为什么相同的任务条目再次被重新插入到 mysql 表中。

Airflow 设置为本地执行器,mysql 数据库作为元存储。

下面是dag代码

main_dag_id = 'myDag'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval='0 3,9 * * *',
    catchup=False,
    max_active_runs=1,
    default_args=args
    )

def start(*args, **kwargs):
    //something

def end(*args, **kwargs):
    //something

def doSomeWork(name, index, *args, **kwargs):
    //something

starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])
    
ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

for index in range(5000):
    dynamicTask1 = PythonOperator(
        task_id='raw_view_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['raw_view', index])

    starting_task.set_downstream(dynamicTask1)
    dynamicTask1.set_downstream(ending_task)

starting_task.set_downstream(ending_task)

完整的日志

2020-12-14 23:00:11,272] {scheduler_job.py:1284} INFO - Processing myDag
[2020-12-14 23:00:11,286] {scheduler_job.py:759} INFO - Examining DAG run <DagRun myDag @ 2020-12-14 23:00:09.937961+00:00: manual__2020-12-14T23:00:09.937961+00:00, externally triggered: True>
[2020-12-14 23:00:12,686] {scheduler_job.py:166} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1264, in _execute_context
    cursor, statement, parameters, context
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 148, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 279, in executemany
    r = self._query(qs)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 371, in _query
    rowcount = self._do_query(q)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 335, in _do_query
    db.query(q)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'start-myDag-2020-12-14 23:00:09.937961' for key 'PRIMARY'")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 158, in _run_file_processor
    pickle_dags)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1611, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1295, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 780, in _process_task_instances
    run.verify_integrity(session=session)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/airflow/models/dagrun.py", line 400, in verify_integrity
    session.commit()
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
  File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
      self._prepare_impl()
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
      self.session.flush()
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2523, in flush
      self._flush(objects)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush
      transaction.rollback(_capture_exception=True)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__
      exc_value, with_traceback=exc_tb,
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
      raise exception
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush
      flush_context.execute()
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
      rec.execute(self)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
      uow,
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
      insert,
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
      c = cached_connections[connection].execute(statement, multiparams)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1020, in execute
      return meth(self, multiparams, params)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
      return connection._execute_clauseelement(self, multiparams, params)
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_clauseelement
      distilled_params,
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1324, in _execute_context
      e, statement, parameters, cursor, context
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1518, in _handle_dbapi_exception
      sqlalchemy_exception, with_traceback=exc_info[2], from_=e
    File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1518, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
       raise exception
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1264, in _execute_context
       cursor, statement, parameters, context
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 148, in do_executemany
       rowcount = cursor.executemany(statement, parameters)
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 279, in executemany
       r = self._query(qs)
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 371, in _query
       rowcount = self._do_query(q)
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/cursors.py", line 335, in _do_query
       db.query(q)
     File "/blahblah/AIRFLOW-1.10.10-python3.6.8_e3/lib/python3.6/site-packages/MySQLdb/connections.py", line 280, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'start-myDag-2020-12-14 23:00:09.937961' for key 'PRIMARY'")
   [SQL: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, pool_slots, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)]
   [parameters: (('start', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 5002, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('end', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 1, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_0', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_1', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_2', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_3', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_4', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_5', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')  ... displaying 10 of 5002 total bound parameter sets ...  ('raw_view_4998', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'), ('raw_view_4999', 'myDag', datetime.datetime(2020, 12, 14, 23, 0, 9, 937961), None, None, None, None, 0, 0, '', 'duser', None, 'default_pool', 1, 'default', 2, 'PythonOperator', None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.'))]
   (Background on this error at: http://sqlalche.me/e/gkpj)
   [2020-12-14 23:00:16,205] {scheduler_job.py:154} INFO - Started process (PID=2024199) to work on /dags/myDag.py
   [2020-12-14 23:00:16,207] {scheduler_job.py:1562} INFO - Processing file /dags/myDag.py for tasks to queue
   [2020-12-14 23:00:16,208] {logging_mixin.py:112} INFO - [2020-12-14 23:00:16,208] {dagbag.py:396} INFO - Filling up the DagBag from /dags/myDag.py

【问题讨论】:

    标签: mysql airflow airflow-scheduler


    【解决方案1】:

    以防万一其他人遇到同样的问题,修复方法是增加 dag_file_processor_timeout,300 秒有效,而默认为 50 秒。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-12-20
      • 1970-01-01
      • 2021-01-25
      • 2021-01-07
      • 1970-01-01
      • 2018-03-05
      • 1970-01-01
      相关资源
      最近更新 更多