【发布时间】:2018-03-15 17:21:49
【问题描述】:
我正在使用气流,我希望能够在表airflow.file_list 中跟踪给定任务实例生成的所有文件,该表是气流使用的同一数据库的一部分(在 postgres 上运行)。使用 SQLAlchemy,我的 file_list 表有以下映射器:
from airflow.models import Base
class MySourceFile(Base):
""" SQLAlchemy mapper class for the file_list table entries."""
__table__ = Table('file_list', Base.metadata,
Column('UID', Integer, primary_key=True),
Column('task_id', String(_ID_LEN), nullable=False),
Column('dag_id', String(_ID_LEN), nullable=False),
Column('execution_date', DateTime, nullable=False),
Column('file_path', String(_ID_LEN), nullable=False),
Column('file_sha256', String(_ID_LEN), nullable=False),
ForeignKeyConstraint(
['task_id', 'dag_id', 'execution_date'],
['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date']
),
extend_existing=True,
)
instance_task = relationship(
TaskInstance,
primaryjoin=and_(
TaskInstance.task_id == __table__.c.task_id,
TaskInstance.dag_id == __table__.c.dag_id,
TaskInstance.execution_date == __table__.c.execution_date
),
viewonly=True,
foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date]
)
我正在从airflow.modles 导入声明性基础,因为我已经读到交互映射器必须共享同一个基础实例。在上面的代码-sn-p 中,我希望instance_task 引用创建文件的task_instance。我的表airflow.file_list 中的表列task_id、dag_id 和execution_date 反映了airflow.task_instance 中的主键。不幸的是,当我运行气流服务器时,我收到以下错误:
sqlalchemy.exc.InvalidRequestError:一个或多个映射器无法初始化 - 无法继续初始化其他映射器。触发映射器:'Mapper|MySourceFile|file_list'。最初的例外是:无法确定关系“MySourceFile.instance_task”的关系方向 - 父项和子项的映射表中均不存在外键列
如果可能,我不希望修改气流源。提前感谢您的帮助。
【问题讨论】:
-
不是您正在寻找的答案,但安装气流和使用您的模型都很好。我会确保使用中的表确实最终具有 fk 约束。您使用的是
extend_existing,因此即使该表在此定义之前存在于元数据中且没有 fk,您也应该被覆盖,但请仔细检查。
标签: python sqlalchemy airflow