【问题标题】:Python 3.6 airflow with a Operator that requires 2.7带有需要 2.7 的 Operator 的 Python 3.6 气流
【发布时间】:2018-07-05 23:32:06
【问题描述】:

我目前正在 python 3.6.5 上运行气流 (1.9.0) 实例。我有一个想要移动到 DAG 的手动工作流程。这个手动工作流程现在需要用 python 2 和 3 编写的代码。让我们将我的 DAG 简化为 3 个步骤:

  1. 为机器处理数据和设置数据的数据流作业 学习培训
  2. Tensorflow ML 训练作业
  3. 其他 我使用 python 3 代码编写的 PythonOperators

数据流作业是用 python 2.7 编写的(谷歌要求),tensorflow 模型代码是在 python 3 中。查看气流 1.9.0 中的“MLEngineTrainingOperator”,有一个 python_version 参数设置“使用的 Python 版本训练中”。

问题:

  • 我可以在worker中动态指定特定的python版本吗 环境?
  • 我是否必须在 python 2.7 上安装气流才能使步骤 1) 运行?
  • 我能否在 python 3 中使用 tensorflow 模型代码,通过在 python 2 上运行的 MlEngineTraining 打包并提交?
  • 我必须在 python 2 中重写我的 3) 运算符吗?

【问题讨论】:

    标签: python tensorflow google-cloud-dataflow airflow


    【解决方案1】:

    没有办法在工作人员上动态指定 python 版本。但是,如果您使用的是 Celery 执行器,您可以在不同的服务器/虚拟机或不同的虚拟环境中运行多个工作程序。

    您可以让一个工作人员运行 python 3,一个运行 2.7,并让每个工作人员监听不同的队列。这可以通过三种不同的方式完成:

    • 在启动 worker 时,您可以添加 -q [queue-name] 标志
    • 设置环境为AIRFLOW__CELERY__DEFAULT_QUEUE
    • 更新airflow.cfg 中[celery] 下的default_queue

    然后在您的任务定义中指定一个queue 参数,根据任务需要运行的python 版本更改队列。

    我不熟悉 MLEngineOperator,但您可以在 PythonOperator 中指定 python_version,它应该在该版本的 virtualenv 中运行它。或者,您可以使用 BashOperator,编写代码以在不同的文件中运行,并指定 python 命令以使用您要使用的 python 版本的绝对路径运行它。

    无论任务如何运行,您只需要确保 DAG 本身与您正在运行它的 python 版本兼容。 IE。如果您要在不同的 python 版本中启动气流工作者,则 DAG 文件本身需要与 python 2 和 3 兼容。 DAG 可以具有附加的文件依赖项,它使用的版本不兼容。

    【讨论】:

    • 所以 MLEngineTrainingOperator 从来没有真正调用任何 python 只是一个带有 python 模块的 http 请求。但是,DataFlowPythonOperator 继承自 BaseOperator 并使用调用 start_python_dataflow 的 DataFlowHook,它没有不同 python 版本的概念。我认为不同的芹菜工人似乎是要走的路
    • 来自airflow.apache.org/configuration.html#setting-up-a-backend:“在worker上执行的操作符需要在该上下文中满足它们的依赖关系。例如,如果您使用HiveOperator,则需要在该框上安装hive CLI ,或者如果您使用 MySqlOperator,则所需的 Python 库需要以某种方式在 PYTHONPATH 中可用”......气流库如何在 python 2 和 3 上运行?我是否必须在 python 2 中使用与 python 3 不同代码的一些运算符?
    • 我猜你是说我必须明确地为此做准备......我只是想知道大多数或所有开箱即用的气流操作符是否在两个 python 2 上运行和 3...
    • 在 python 2 中运行我的 dag 目前正在给我:“文件”/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py”,第 1588 行,在进程返回加载(值)ValueError:不支持的pickle协议:4“
    • 大多数操作员将使用 2 或 3。我建议在 python 3 下运行气流。然后您可以使用 PythonVirtualenvOperator (github.com/apache/incubator-airflow/blob/…) 或设置您的工作人员安装 Python 2但不在你的路径中,然后使用BashOperator指定python 2的绝对路径(即/path/to/python2 /path/to/your_code_that_requires_py2)在Py2中执行脚本。
    【解决方案2】:

    开箱即用,您不能在一般的 python 3 气流集群上运行 python 2 气流工作者:

    Airflow 使用 SQLAlchemy(我相信可以将有关 DAG 的元数据读写到数据库中)。当您在工作人员上运行 DAG 时,它将从数据库中读取有关该 DAG 的腌制信息。如果您的其他非工作组件在 python 3 中,它们将在 pickle 4 中写入数据库,而工作人员将尝试在 python 2 中从数据库中读取。

    在 SQLAlchemy 中特别看一下 sqltypes.py:

    class PickleType(TypeDecorator):
        """Holds Python objects, which are serialized using pickle.
    
        PickleType builds upon the Binary type to apply Python's
        ``pickle.dumps()`` to incoming objects, and ``pickle.loads()`` on
        the way out, allowing any pickleable Python object to be stored as
        a serialized binary field.
    
        To allow ORM change events to propagate for elements associated
        with :class:`.PickleType`, see :ref:`mutable_toplevel`.
    
        """
    
        impl = LargeBinary
    
        def __init__(self, protocol=pickle.HIGHEST_PROTOCOL,
                     pickler=None, comparator=None): 
    

    然后在 compat.py 中最终在 sqltypes.py 中进行酸洗。

    py36 = sys.version_info >= (3, 6)
    py33 = sys.version_info >= (3, 3)
    py35 = sys.version_info >= (3, 5)
    py32 = sys.version_info >= (3, 2)
    py3k = sys.version_info >= (3, 0)
    py2k = sys.version_info < (3, 0)
    py265 = sys.version_info >= (2, 6, 5)
    jython = sys.platform.startswith('java')
    pypy = hasattr(sys, 'pypy_version_info')
    win32 = sys.platform.startswith('win')
    cpython = not pypy and not jython  # TODO: something better for this ?
    
    import collections
    next = next
    
    if py3k:
        import pickle
    else:
        try:
            import cPickle as pickle
        except ImportError:
            import pickle
    

    Also donot_pickle = True in气流似乎对此没有影响???也许是因为根据here,它只与回填有关?

    【讨论】:

      猜你喜欢
      • 2019-12-16
      • 2013-04-16
      • 1970-01-01
      • 1970-01-01
      • 2012-06-04
      • 1970-01-01
      • 2022-06-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多