开箱即用,您不能在一般的 python 3 气流集群上运行 python 2 气流工作者:
Airflow 使用 SQLAlchemy(我相信可以将有关 DAG 的元数据读写到数据库中)。当您在工作人员上运行 DAG 时,它将从数据库中读取有关该 DAG 的腌制信息。如果您的其他非工作组件在 python 3 中,它们将在 pickle 4 中写入数据库,而工作人员将尝试在 python 2 中从数据库中读取。
在 SQLAlchemy 中特别看一下 sqltypes.py:
class PickleType(TypeDecorator):
"""Holds Python objects, which are serialized using pickle.
PickleType builds upon the Binary type to apply Python's
``pickle.dumps()`` to incoming objects, and ``pickle.loads()`` on
the way out, allowing any pickleable Python object to be stored as
a serialized binary field.
To allow ORM change events to propagate for elements associated
with :class:`.PickleType`, see :ref:`mutable_toplevel`.
"""
impl = LargeBinary
def __init__(self, protocol=pickle.HIGHEST_PROTOCOL,
pickler=None, comparator=None):
然后在 compat.py 中最终在 sqltypes.py 中进行酸洗。
py36 = sys.version_info >= (3, 6)
py33 = sys.version_info >= (3, 3)
py35 = sys.version_info >= (3, 5)
py32 = sys.version_info >= (3, 2)
py3k = sys.version_info >= (3, 0)
py2k = sys.version_info < (3, 0)
py265 = sys.version_info >= (2, 6, 5)
jython = sys.platform.startswith('java')
pypy = hasattr(sys, 'pypy_version_info')
win32 = sys.platform.startswith('win')
cpython = not pypy and not jython # TODO: something better for this ?
import collections
next = next
if py3k:
import pickle
else:
try:
import cPickle as pickle
except ImportError:
import pickle
Also donot_pickle = True in气流似乎对此没有影响???也许是因为根据here,它只与回填有关?