【问题标题】:Airflow (Google Composer) TypeError: can't pickle _thread.RLock objectsAirflow (Google Composer) TypeError: can't pickle _thread.RLock objects
【发布时间】:2020-09-09 13:45:43
【问题描述】:

我正在使用气流(谷歌作曲家),但在下面遇到了一些异常

TypeError: can't pickle _thread.RLock objects

Ooops.

                          ____/ (  (    )   )  \___
                         /( (  (  )   _    ))  )   )\
                       ((     (   )(    )  )   (   )  )
                     ((/  ( _(   )   (   _) ) (  () )  )
                    ( (  ( (_)   ((    (   )  .((_ ) .  )_
                   ( (  )    (      (  )    )   ) . ) (   )
                  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
                  ( (  (   ) (  )   (  ))     ) _)(   )  )  )
                 ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
                  (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
                 ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
                  ((  (   )(    (     _    )   _) _(_ (  (_ )
                   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
                   ((__)        \\||lll|l||///          \_))
                            (   /(/ (  )  ) )\   )
                          (    ( ( ( | | ) ) )\   )
                           (   /(| / ( )) ) ) )) )
                         (     ( ((((_(|)_)))))     )
                          (      ||\(|(|)|/||     )
                        (        |(||(||)||||        )
                          (     //|/l|||)|\\ \     )
                        (/ / //  /|//||||\\  \ \  \ _)
-------------------------------------------------------------------------------
Node: d93e048dc08a
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/opt/python3.6/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/utils.py", line 290, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/utils.py", line 337, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/views.py", line 1335, in clear
    include_upstream=upstream)
  File "/usr/local/lib/airflow/airflow/models/dag.py", line 1243, in sub_dag
    for t in regex_match + also_include}
  File "/usr/local/lib/airflow/airflow/models/dag.py", line 1243, in <dictcomp>
    for t in regex_match + also_include}
  File "/opt/python3.6/lib/python3.6/copy.py", line 161, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/airflow/airflow/models/baseoperator.py", line 678, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 215, in _deepcopy_list
    append(deepcopy(a, memo))
  File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/python3.6/lib/python3.6/copy.py", line 169, in deepcopy
    rv = reductor(4)
TypeError: can't pickle _thread.RLock objects

我尝试了什么?

  1. 从气流UI界面到清除任务,任务,不工作
  2. 从使用命令回填之类的命令,不起作用
  3. 重启 Airflow 网络服务,不起作用
  4. 更改了 DAG retry_delay=timedelta(seconds=5)

任何人都可以帮助解决上述问题吗?非常感谢

我注意到 StackOverflow 上有一些类似的问题,但这些问题并没有真正得到解决。

Airflow can't pickle _thread._local objects

Airflow 1.9.0 ExternalTaskSensor retry_delay=30 yields TypeError: can't pickle _thread.RLock objects

【问题讨论】:

  • 您的 Cloud Composer 映像版本是什么?你在 DAG 中使用自定义运算符吗?
  • 镜像版本是 composer-1.11.2-airflow-1.10.9 。不,我没有自定义运算符
  • 我遇到了类似的问题,在我的情况下,我将问题隔离到一个 SubDag,该 SubDag 依赖于复杂关系中的其他 SubDag。将 DAG 依赖图重新设计得更简单一些会导致问题消失。我对这种解释没有足够的信心使其成为答案,但如果有人被困在这个问题上,那么将你的依赖关系图更改为更简单可能至少值得研究。它对我有用。

标签: airflow airflow-scheduler google-cloud-composer


【解决方案1】:

我猜在 Apache Jira tracker 上报告了类似的问题,查看那里的讨论线程,我可以指出一些可能有助于克服这个问题的问题:

  • 我建议查看特定的 DAG,检查 专用 DAG 运算符的默认参数的正确类型, 尽管已经检查了retry_delay,但它 值得查看其余参数,link 是 问题中已经提到过;

  • 要进一步调试,请验证您的 DAG Operator 是否仅使用 pickable(可序列化)对象,根据 here 发布的评论。

  • 我假设我们仍然收到来自用户的一些问题 通过 Airflow WEB UI 清除 Airflow DAG 任务,看看这个 thread。为了 缓解此问题,您可以删除其中的失败任务 气流command-line tool(例如here)或作为最后的手段 从 Airflow 元数据数据库中删除 task_id 记录。

    连接到 Composer 的一名工作人员:

    kubectl -it exec $(kubectl get po -l run=airflow-worker -o jsonpath='{.items[0].metadata.name}' \
        -n $(kubectl get ns| grep composer*| awk '{print $1}')) -n $(kubectl get ns| grep composer*| awk '{print $1}') \
        -c airflow-worker -- mysql -u root -h airflow-sqlproxy-service.default
    

    使用mysql 客户端:

    mysql> show databases;
    +-----------------------------------------+
    | Database                                |
    +-----------------------------------------+
    | information_schema                      |
    | composer-1-11-3-airflow-1-10-6-* |
    | mysql                                   |
    | performance_schema                      |
    | sys                                     |
    +-----------------------------------------+
    5 rows in set (0.01 sec)   
    

    启动与composer-1-11-3-airflow-1-10-6-* 架构的连接:

    mysql&gt; use composer-1-11-3-airflow-1-10-6-*;

    删除失败的task_id

    delete from task_instance where task_id='<task_id>' AND execution_date='<execution_date>'
    

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-14
    • 2018-10-15
    • 2018-07-20
    • 2018-06-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多