【问题标题】:SQLAlchemy relationship with task_instance table in airflowSQLAlchemy 与气流中的 task_instance 表的关系
【发布时间】:2018-03-15 17:21:49
【问题描述】:

我正在使用气流,我希望能够在表airflow.file_list 中跟踪给定任务实例生成的所有文件,该表是气流使用的同一数据库的一部分(在 postgres 上运行)。使用 SQLAlchemy,我的 file_list 表有以下映射器:

from airflow.models import Base

class MySourceFile(Base):
    """ SQLAlchemy mapper class for the file_list table entries."""
    __table__ = Table('file_list', Base.metadata,
        Column('UID', Integer, primary_key=True),
        Column('task_id', String(_ID_LEN), nullable=False),
        Column('dag_id', String(_ID_LEN), nullable=False),
        Column('execution_date', DateTime, nullable=False),
        Column('file_path', String(_ID_LEN), nullable=False),
        Column('file_sha256', String(_ID_LEN), nullable=False),
        ForeignKeyConstraint(
            ['task_id', 'dag_id', 'execution_date'],
            ['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date']
        ),
        extend_existing=True,
    )

    instance_task = relationship(
    TaskInstance,
    primaryjoin=and_(
        TaskInstance.task_id == __table__.c.task_id,
        TaskInstance.dag_id == __table__.c.dag_id,
        TaskInstance.execution_date == __table__.c.execution_date
    ),
    viewonly=True,
    foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date]
)

我正在从airflow.modles 导入声明性基础,因为我已经读到交互映射器必须共享同一个基础实例。在上面的代码-sn-p 中,我希望instance_task 引用创建文件的task_instance。我的表airflow.file_list 中的表列task_iddag_idexecution_date 反映了airflow.task_instance 中的主键。不幸的是,当我运行气流服务器时,我收到以下错误:

sqlalchemy.exc.InvalidRequestError:一个或多个映射器无法初始化 - 无法继续初始化其他映射器。触发映射器:'Mapper|MySourceFile|file_list'。最初的例外是:无法确定关系“MySourceFile.instance_task”的关系方向 - 父项和子项的映射表中均不存在外键列

如果可能,我不希望修改气流源。提前感谢您的帮助。

【问题讨论】:

  • 不是您正在寻找的答案,但安装气流和使用您的模型都很好。我会确保使用中的表确实最终具有 fk 约束。您使用的是extend_existing,因此即使该表在此定义之前存在于元数据中且没有 fk,您也应该被覆盖,但请仔细检查。

标签: python sqlalchemy airflow


【解决方案1】:

首先像这样指定__table__ 很奇怪(我以前从未见过),并且通常以和下划线开头的任何内容都是私有的,应该避免。

您需要指定task_id 是外键。像这样的:

task_id = Column(String, ForeignKey('task.id'))

docs on SQL Alchemy relationships 如果您还没有看到它们,它们会很有用。 backref 模式可能有助于创建从任务实例到自定义文件的关系,而无需更改 TaskInstance 模型。

【讨论】:

  • 以我所做的方式指定列/表在文档中称为Hybrid Approach。我以前使用过您建议的声明性方法,但没有成功。
  • 对了。不过,您应该能够在此模式下将ForeignKey(task_instance.id) 传递给 Column() 调用。
  • 哦。我完全错过了它是一个复合外键。
猜你喜欢
  • 2015-12-10
  • 2016-01-26
  • 2011-09-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-02-23
  • 1970-01-01
  • 2011-07-20
相关资源
最近更新 更多