【问题标题】:How to execute Cloud Run containers into an Airflow DAG?如何将 Cloud Run 容器执行到 Airflow DAG 中?
【发布时间】:2023-04-06 01:48:01
【问题描述】:

我正在尝试使用 Cloud Run 作为 Airflow 的 DAG 的任务来运行容器。

似乎没有像 CloudRunOperator 或类似的东西,我在文档中找不到任何东西(Cloud Run 和 Airflow 之一)。

有人处理过这个问题吗? 如果是,如何使用 Cloud Run 运行容器并处理 xcom?

提前致谢!!

【问题讨论】:

    标签: google-cloud-platform airflow google-cloud-run google-cloud-composer


    【解决方案1】:

    当容器部署到 Cloud Run 时,AFAIK 会自动侦听可能要发送的请求。参考document

    相反,您可以发送请求以访问已部署的容器。您可以使用下面的代码来做到这一点。

    此 DAG 具有三个任务 print_tokentask_get_opprocess_data

    • print_token 打印对已部署 Cloud Run 容器的请求进行身份验证所需的身份令牌。我使用“xcom_pull”获取“BashOperator”的输出并将身份验证令牌分配给token,因此这可用于对您将执行的HTTP请求进行身份验证。
    • task_get_op 对连接 cloud_run(仅包含 Cloud Run 端点)执行 GET 并为身份验证定义标头 'Authorization': 'Bearer ' + token
    • process_data 对“task_get_op”执行“xcom_pull”以获取输出并使用 PythonOperator 打印。
    import datetime
    
    import airflow
    from airflow.operators import bash
    from airflow.operators import python
    from airflow.providers.http.operators.http import SimpleHttpOperator
    
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': YESTERDAY,
    }
    
    with airflow.DAG(
            'composer_http_request',
            'catchup=False',
            default_args=default_args,
            schedule_interval=datetime.timedelta(days=1)) as dag:
    
        print_token = bash.BashOperator(
            task_id='print_token', 
            bash_command='gcloud auth print-identity-token "--audiences=https://hello-world-fri824-ab.c.run.app"' # The end point of the deployed Cloud Run container
        ) 
    
        token = "{{ task_instance.xcom_pull(task_ids='print_token') }}" # gets output from 'print_token' task
    
        task_get_op = SimpleHttpOperator(
            task_id='get_op',
            method='GET',
            http_conn_id='cloud_run',
            headers={'Authorization': 'Bearer ' + token },
        )
    
        def process_data_from_http(**kwargs):
          ti = kwargs['ti']
          http_data = ti.xcom_pull(task_ids='get_op')
          print(http_data)
    
        process_data = python.PythonOperator(
          task_id='process_data_from_http',
          python_callable=process_data_from_http,
          provide_context=True
          )
        print_token >> task_get_op >> process_data
    
    

    cloud_run 连接:

    输出(图表):

    print_token 日志:

    task_get_op 日志:

    process_data 日志(来自 GET 的输出):

    注意:我正在使用 Cloud Composer 1.17.7 和 Airflow 2.0.2 并安装了 apache-airflow-providers-http 以便能够使用 SimpleHttpOperator

    【讨论】:

    • 感谢 Ricco,非常有趣的解决方案!但这意味着让容器始终启动并运行,最终等待“做某事”的调用。我正在寻找类似 KubernetesPodOperator 的东西,它允许我部署我的应用程序,甚至等待撕毁时间!
    • 如果您担心 Cloud Run 的成本,基于Cloud Run pricing,即使部署容器也不会产生费用,它只会在处理请求时开始收费。我对 KubernetesPodOperator 不是很熟悉。您可以尝试创建一个 PythonOperator 来自动在 Cloud Run 中进行部署,它的缺点是工作量很大。
    猜你喜欢
    • 1970-01-01
    • 2020-11-02
    • 2018-05-13
    • 1970-01-01
    • 2020-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-26
    相关资源
    最近更新 更多