【发布时间】:2020-09-09 13:45:43
【问题描述】:
我正在使用气流(谷歌作曲家),但在下面遇到了一些异常
TypeError: can't pickle _thread.RLock objects
Ooops.
____/ ( ( ) ) \___
/( ( ( ) _ )) ) )\
(( ( )( ) ) ( ) )
((/ ( _( ) ( _) ) ( () ) )
( ( ( (_) (( ( ) .((_ ) . )_
( ( ) ( ( ) ) ) . ) ( )
( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
( ( ( ) ( ) ( )) ) _)( ) ) )
( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
(( ( )( ( _ ) _) _(_ ( (_ )
(_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
((__) \\||lll|l||/// \_))
( /(/ ( ) ) )\ )
( ( ( ( | | ) ) )\ )
( /(| / ( )) ) ) )) )
( ( ((((_(|)_))))) )
( ||\(|(|)|/|| )
( |(||(||)|||| )
( //|/l|||)|\\ \ )
(/ / // /|//||||\\ \ \ \ _)
-------------------------------------------------------------------------------
Node: d93e048dc08a
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/opt/python3.6/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/utils.py", line 290, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/utils.py", line 337, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/views.py", line 1335, in clear
include_upstream=upstream)
File "/usr/local/lib/airflow/airflow/models/dag.py", line 1243, in sub_dag
for t in regex_match + also_include}
File "/usr/local/lib/airflow/airflow/models/dag.py", line 1243, in <dictcomp>
for t in regex_match + also_include}
File "/opt/python3.6/lib/python3.6/copy.py", line 161, in deepcopy
y = copier(memo)
File "/usr/local/lib/airflow/airflow/models/baseoperator.py", line 678, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 215, in _deepcopy_list
append(deepcopy(a, memo))
File "/opt/python3.6/lib/python3.6/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/opt/python3.6/lib/python3.6/copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/python3.6/lib/python3.6/copy.py", line 169, in deepcopy
rv = reductor(4)
TypeError: can't pickle _thread.RLock objects
我尝试了什么?
- 从气流UI界面到清除任务,任务,不工作
- 从使用命令回填之类的命令,不起作用
- 重启 Airflow 网络服务,不起作用
- 更改了 DAG retry_delay=timedelta(seconds=5)
任何人都可以帮助解决上述问题吗?非常感谢
我注意到 StackOverflow 上有一些类似的问题,但这些问题并没有真正得到解决。
Airflow can't pickle _thread._local objects
Airflow 1.9.0 ExternalTaskSensor retry_delay=30 yields TypeError: can't pickle _thread.RLock objects
【问题讨论】:
-
您的 Cloud Composer 映像版本是什么?你在 DAG 中使用自定义运算符吗?
-
镜像版本是 composer-1.11.2-airflow-1.10.9 。不,我没有自定义运算符
-
我遇到了类似的问题,在我的情况下,我将问题隔离到一个 SubDag,该 SubDag 依赖于复杂关系中的其他 SubDag。将 DAG 依赖图重新设计得更简单一些会导致问题消失。我对这种解释没有足够的信心使其成为答案,但如果有人被困在这个问题上,那么将你的依赖关系图更改为更简单可能至少值得研究。它对我有用。
标签: airflow airflow-scheduler google-cloud-composer