【问题标题】:Upload Airflow XCom to Google Cloud Storage as JSON File将 Airflow XCom 作为 JSON 文件上传到 Google Cloud Storage
【发布时间】:2025-11-23 18:35:04
【问题描述】:

在 Airflow 中,我有一个 XCom 任务 ID customer_schema,我想将其转换为名为 final_schema.json 的 JSON 文件并上传到 Google Cloud Storage。我在 Google Cloud Storage 中的存储桶名为 northern_industrial_customer。我尝试使用以下FileToGoogleCloudStorageOperator,但它不起作用。

有谁知道我如何将我的 XCom 任务 ID customer_schema 以名为 final_schema.json 的 JSON 文件的形式传输到 Google Cloud Storage?

transfer_to_gcs = FileToGoogleCloudStorageOperator(task_id = 'transfer_to_gcs', src = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", dst = 'final_schema.json', bucket = 'northern_industrial_customer', google_cloud_storage_conn_id = conn_id_gcs)

【问题讨论】:

    标签: google-cloud-platform google-cloud-firestore google-bigquery google-cloud-storage airflow


    【解决方案1】:

    Airflow 中没有操作符来执行这些操作,但是 Airflow 是可扩展的,您可以编写自己的自定义操作符。

    import tempfile
    import warnings
    
    from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults
    
    
    class ContentToGoogleCloudStorageOperator(BaseOperator):
        """
        Uploads a text content to Google Cloud Storage.
        Optionally can compress the content for upload.
    
        :param content: Content to upload. (templated)
        :type src: str
        :param dst: Destination path within the specified bucket, it must be the full file path
            to destination object on GCS, including GCS object (ex. `path/to/file.txt`) (templated)
        :type dst: str
        :param bucket: The bucket to upload to. (templated)
        :type bucket: str
        :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
        :type gcp_conn_id: str
        :param mime_type: The mime-type string
        :type mime_type: str
        :param delegate_to: The account to impersonate, if any
        :type delegate_to: str
        :param gzip: Allows for file to be compressed and uploaded as gzip
        :type gzip: bool
        """
        template_fields = ('src', 'dst', 'bucket')
    
        @apply_defaults
        def __init__(self,
                     content,
                     dst,
                     bucket,
                     gcp_conn_id='google_cloud_default',
                     mime_type='application/octet-stream',
                     delegate_to=None,
                     gzip=False,
                     *args,
                     **kwargs):
            super().__init__(*args, **kwargs)
    
            self.content = content
            self.dst = dst
            self.bucket = bucket
            self.gcp_conn_id = gcp_conn_id
            self.mime_type = mime_type
            self.delegate_to = delegate_to
            self.gzip = gzip
    
        def execute(self, context):
            """
            Uploads the file to Google cloud storage
            """
            hook = GoogleCloudStorageHook(
                google_cloud_storage_conn_id=self.gcp_conn_id,
                delegate_to=self.delegate_to
            )
    
            with tempfile.NamedTemporaryFile(prefix="gcs-local") as file:
                file.write(self.content)
                file.flush()
                hook.upload(
                    bucket_name=self.bucket,
                    object_name=self.dst,
                    mime_type=self.mime_type,
                    filename=file.name,
                    gzip=self.gzip,
                )
    
    
    transfer_to_gcs = ContentToGoogleCloudStorageOperator(
        task_id = 'transfer_to_gcs', 
        content = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", 
        dst = 'final_schema.json', 
        bucket = 'northern_industrial_customer', 
        gcp_conn_id = conn_id_gcs)
    

    请注意,在 Airflow 2.0 中,FileToGoogleCloudStorageOperator 运算符中的 google_cloud_storage_conn_id 参数已停用。你应该使用gcp_conn_id

    【讨论】:

      最近更新 更多