【问题标题】:Airflow task still running even after the endpoint process is completed即使端点进程完成,气流任务仍在运行
【发布时间】:2022-07-13 14:36:38
【问题描述】:

我有一个气流任务,使用 SimpleHTTPOperator 在 Google Cloud Run 中访问服务。 我检查了 Cloud Run Logs,服务本身在 12 分钟内完成,但气流任务一直在运行,我需要手动将其标记为成功。

知道我应该检查什么吗? 我是气流新手,所以我不知道应该检查哪些日志,因为任务日志中没有有用的信息。

我使用 LocalExecutor 来运行 DAG。

【问题讨论】:

  • 可以分享一下运营商的代码吗?

标签: airflow google-cloud-run


【解决方案1】:

在我的团队中,我们使用的是 HTTP 运算符。但是,我们还基于它创建了一个自定义算子来调用 Google Cloud Run。

我们有许多 dag 需要进行多次云运行调用来进行 API 调用以进行数据提取。 Airflow 调用 Cloud Run,Cloud Run 调用 API。

对于其中一些任务,当响应时间高于 8 分钟时,Cloud Run 会向 Airflow 发送 200,但该任务会无限期地继续运行。

我们从 Google Cloud Support 获得的一个建议是使用 HttpSensor 而不是 HttpOperator。 Aparrently,操作员运行任务异步运行,但传感器可以等待一些事件和任务完成。您可以尝试使用 HttpSensor 看看它是否有效。

【讨论】:

    【解决方案2】:

    我遇到了类似的问题,我想我可以提供比最初的帖子更详细的信息。我的团队大量使用 Cloud Composer,对于某些应用程序,我们需要将数据从 Bigquery 插入 azure(驱动程序 17)中的 MSQL 数据库,这需要一些烦人的驱动程序。

    由于没有在 composer 2 上安装它的干净方法,我使用了一个云运行应用程序。我从气流中调用该云运行应用程序,以实际将数据插入到 Azure。

    cloudrun_endpoints = ["update_staging", "update_production"]
    
    @dag(schedule_interval="45 3 * * *", start_date=dates.days_ago(1))
    def bq_to_azure():
     get_cloud_run_token = bash.BashOperator(
            task_id="get_cloud_run_token",
            bash_command=f'gcloud auth print-identity-token "--audiences={cloudrun_uri}"',
        )
        token = "{{ task_instance.xcom_pull(task_ids='get_cloud_run_token') }}"
        cloud_run_tasks = [
            HttpSensor(
                task_id=f"{endpoint}",
                http_conn_id="xxxx",
                headers={"Authorization": f"Bearer {token}"},
                endpoint=endpoint,
                response_check=lambda response:  response.json()["job_status"] == "done" or response.status_code != 200,
                poke_interval=5,
                method="POST",
            )
            for endpoint in cloudrun_endpoints
        ]
    
        chain(get_cloud_run_token, *cloud_run_tasks)
    

    基本上,如果请求需要一些时间,大部分时间气流就会卡住,因为在执行 http 调用的任务中保持在运行模式,而我可以在云运行日志中看到请求以状态 200 完成...

    目前我只是打算将 docker 应用程序实际部署到 GKE 而不是云运行。

    编辑:这发生在 gen1 或 gen2 云运行中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-16
      • 2019-07-21
      相关资源
      最近更新 更多