【问题标题】:Airflow Composer deleting specific xcom keysAirflow Composer 删除特定的 xcom 键
【发布时间】:2021-09-05 12:26:19
【问题描述】:

我正在通过 Airflow 并行编排多个数据流作业。管道执行的任务之一是推送唯一的 xcom 键来存储每个管道的临时值,该值用于将值插入 Bigquery 表。数据加载完成后,我需要清除特定的 xcom 密钥。我不想删除其他可能由仍在并行运行的其他数据流作业生成的其他 xcom 密钥。他们是清除特定 xcom 密钥的方法吗?

【问题讨论】:

  • 也许你可以在完成后为 xcom 推送一个新值。如果您使用密码等,请考虑改用 secretmanager。

标签: airflow google-cloud-composer


【解决方案1】:

您需要从任务中查询 Airflow Metastore DB 并删除要删除的特定 XCom。目前没有执行特定 XCom 删除的现有方法。

例如使用 TaskFlow API:

    from airflow.models import XCom
    from airflow.utils.session import provide_session

    @task(multiple_outputs=True)
    def push_xcom() -> dict:
        return {"xcom_1": "value_1", "xcom_2": "value_2"}

    @task
    @provide_session
    def delete_xcom(key: str, task_id: str, dag_id: str, session=None) -> None:
        session.query(XCom).filter(
            XCom.key==key, XCom.task_id==task_id, XCom.dag_id==dag_id
        ).delete()
        session.commit()

    push_xcom() >> delete_xcom(key="xcom_2", task_id="push_xcom", dag_id=dag.dag_id)

“push_xcom”任务将推送两个独立的XComs -- xcom_1xcom_2。 (对于此示例,您可以忽略“return_value”XCom,因为当任务具有返回值时,默认情况下会创建特定的 XCom。)

查询元存储数据库确认所有XComs 都已存储。

“delete_xcom”任务将删除键为“xcom_2”的XCom。成功运行该任务后,再次查询元存储数据库显示“xcom_2”键的XCom 行已被删除。

最后,查看 UI 中“push_xcom”任务返回的XComs,键为“xcom_2”的XCom 也不再可见。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-28
    • 1970-01-01
    • 2022-11-26
    相关资源
    最近更新 更多