我遇到了类似的问题,我想我可以提供比最初的帖子更详细的信息。我的团队大量使用 Cloud Composer,对于某些应用程序,我们需要将数据从 Bigquery 插入 azure(驱动程序 17)中的 MSQL 数据库,这需要一些烦人的驱动程序。
由于没有在 composer 2 上安装它的干净方法,我使用了一个云运行应用程序。我从气流中调用该云运行应用程序,以实际将数据插入到 Azure。
cloudrun_endpoints = ["update_staging", "update_production"]
@dag(schedule_interval="45 3 * * *", start_date=dates.days_ago(1))
def bq_to_azure():
get_cloud_run_token = bash.BashOperator(
task_id="get_cloud_run_token",
bash_command=f'gcloud auth print-identity-token "--audiences={cloudrun_uri}"',
)
token = "{{ task_instance.xcom_pull(task_ids='get_cloud_run_token') }}"
cloud_run_tasks = [
HttpSensor(
task_id=f"{endpoint}",
http_conn_id="xxxx",
headers={"Authorization": f"Bearer {token}"},
endpoint=endpoint,
response_check=lambda response: response.json()["job_status"] == "done" or response.status_code != 200,
poke_interval=5,
method="POST",
)
for endpoint in cloudrun_endpoints
]
chain(get_cloud_run_token, *cloud_run_tasks)
基本上,如果请求需要一些时间,大部分时间气流就会卡住,因为在执行 http 调用的任务中保持在运行模式,而我可以在云运行日志中看到请求以状态 200 完成...
目前我只是打算将 docker 应用程序实际部署到 GKE 而不是云运行。
编辑:这发生在 gen1 或 gen2 云运行中。