【问题标题】:Airflow SimpleHttpOperator is not pushing to xcomAirflow SimpleHttpOperator 没有推送到 xcom
【发布时间】:2022-11-26 13:08:35
【问题描述】:

我的 dag 中有以下 SimpleHttpOperator:

extracting_user = SimpleHttpOperator(
        task_id='extracting_user',
        http_conn_id='user_api',
        endpoint='api/', # Some Api already configured and checked
        method="GET",
        response_filter=lambda response: json.loads(response.text),
        log_response=True,
        do_xcom_push=True,
    )

接着是 PythonOperator:

processing_user = PythonOperator(
        task_id='processing_user',
        python_callable=_processing_user
    )

功能:

def _processing_user(ti):
    users = ti.xcom_pull(task_ids=['extracting_user'])
    if not len(users) or 'results' not in users[0]:
        raise ValueError(f'User is empty')

    **More function code**

当我执行 airflow tasks test myDag extracting_user 2022-03-02 后跟 airflow tasks test myDag processing_user 2022-03-02 时,我得到值错误,用户变量等于一个空数组。

我单独测试了 extracting_user 任务,它从 API 获取了所需的数据。我已经用 sqlite xcom 查询过,它是一个空表。

我正在使用气流 2.3.0

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    我解决了更改为airflow 2.0.0版本的问题。似乎 SimpleHttpOperator 没有将请求响应存储在 2.3.0 版本的 xcom 表上

    【讨论】:

      【解决方案2】:

      SimpleHttpOperator 确实返回 XCOM。但是,命令 airflow tasks test 不再创建 XComs,wh

      【讨论】:

        猜你喜欢
        • 2023-02-09
        • 1970-01-01
        • 2019-01-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-03-14
        相关资源
        最近更新 更多