【问题标题】:How to invoke an on-demand bigquery Data transfer service?如何调用按需 bigquery 数据传输服务?
【发布时间】:2020-09-23 23:23:19
【问题描述】:

我非常喜欢 BigQuery 的数据传输服务。我有要加载到 BQ 中的确切架构中的平面文件。如果只设置 DTS 计划以获取匹配模式的 GCS 文件并将其加载到 BQ 中,那就太棒了。我喜欢在复制和电子邮件后删除源文件的内置选项,以防万一。但最大的遗憾是最小间隔是 60 分钟。这很疯狂。也许我可以延迟 10 分钟。

如果我将 DTS 设置为按需提供,我如何从 API 调用它?我正在考虑创建一个每 10 分钟按需调用它的 cronjob。但我无法通过文档弄清楚如何调用它。

另外,我将 GCS 文件(无需 ETL)移动到与确切架构匹配的 bq 表中的第二个最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud Run 等吗?

如果我使用 Cloud Function,如何在调用时将 GCS 中的所有文件作为一个 bq 加载作业提交?

最后,有人知道 DTS 是否会在未来将限制降低到 10 分钟?

【问题讨论】:

    标签: google-bigquery data-transfer


    【解决方案1】:

    如果我将 DTS 设置为按需提供,我如何从 API 调用它?我正在考虑创建一个每 10 分钟按需调用它的 cronjob。但我无法通过文档弄清楚如何调用它。

    StartManualTransferRunsRPC library 的一部分,但目前还没有等效的 REST API。如何使用它取决于您的环境。例如,您可以使用 Python 客户端库 (docs)。

    作为示例,我使用了以下代码(您需要运行 pip install google-cloud-bigquery-datatransfer 以获得相关性):

    import time
    
    from google.cloud import bigquery_datatransfer_v1
    from google.protobuf.timestamp_pb2 import Timestamp
    
    
    client = bigquery_datatransfer_v1.DataTransferServiceClient()
    
    PROJECT_ID = 'PROJECT_ID'
    TRANSFER_CONFIG_ID = '5e6...7bc'  # alphanumeric ID you'll find in the UI 
    
    parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
    
    start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
    
    response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
    print(response)
    
    

    请注意,您需要使用正确的传输配置 ID,并且 requested_run_time 必须是 bigquery_datatransfer_v1.types.Timestamp 类型(文档中没有示例)。我将开始时间设置为比当前执行时间提前 10 秒。

    您应该得到如下响应:

    runs {
      name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
      destination_dataset_id: "DATASET_NAME"
      schedule_time {
        seconds: 1579358571
        nanos: 922599371
      }
      ...
      data_source_id: "google_cloud_storage"
      state: PENDING
      params {
        ...
      }
      run_time {
        seconds: 1579358581
      }
      user_id: 28...65
    }
    

    并且传输按预期触发(不要介意错误):

    另外,我将 GCS 文件(不需要 ETL)移动到与确切架构匹配的 bq 表中的第二个最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud Run 等吗?

    有了这个,您可以设置一个 cron 作业以每十分钟执行一次您的函数。正如 cmets 中所讨论的,最小间隔是 60 分钟,因此它不会拾取少于一小时的文件 (docs)。

    除此之外,这不是一个非常强大的解决方案,您的后续问题会在这里发挥作用。我认为这些可能过于宽泛,无法在单个 StackOverflow 问题中解决,但我想说,对于按需刷新,Cloud Scheduler + Cloud Functions/Cloud Run 可以很好地工作。

    如果您需要 ETL,Dataflow 将是最好的选择,但它有一个可以监视文件模式的 GCS 连接器 (example)。这样您就可以跳过传输,设置监视间隔和加载作业触发频率以将文件写入 BigQuery。与以前的方法不同,VM 将在流式管道中持续运行,但可以进行 10 分钟的观察期。

    如果您有复杂的工作流程/依赖项,Airflow 最近引入了operators 来开始手动运行。

    如果我使用 Cloud Function,如何在调用时将 GCS 中的所有文件作为一个 bq 加载作业提交?

    您可以在创建传输时使用wildcards 匹配文件模式:

    此外,这可以使用Pub/Sub notifications for Cloud Storage 逐个文件完成,以触发云函数。

    最后,有人知道 DTS 是否会在未来将限制降低到 10 分钟?

    已有功能请求here。随意star它以表达您的兴趣并接收更新

    【讨论】:

    • 非常感谢您提供非常详细的回答。快速提问,即使我能够触发 DTS 运行,它是否不会查看任何文件,除非超过 60 分钟?我问的原因是我手动触发了 DTS 作业,它说没有找到超过 60 分钟的文件?因此,从创建的那一刻起,我似乎无法每 10 分钟导入一次文件,如果使用 DTS,总是会有 60 分钟的延迟。你能确认一下吗?
    • 是的,似乎记录在 here。如果您需要更频繁的更新,您可以使用 Dataflow 或Pub/Sub notifications for Cloud Storage 来触发 Cloud Function。
    • 这只是一个超级奇怪的限制,文件需要一个小时前才能被拾取。哦,好吧。
    • 我知道“project_transfer_config_path”不是客户端的一部分
    【解决方案2】:

    现在您可以使用 RESTApi 轻松手动运行传输 Bigquery 数据:

    HTTP request
    POST https://bigquerydatatransfer.googleapis.com/v1/{parent=projects/*/locations/*/transferConfigs/*}:startManualRuns
    • 关于这部分 > {parent=projects//locations//transferConfigs/*},检查传输的配置,然后注意如下图所示的部分。

    Here

    更多: https://cloud.google.com/bigquery-transfer/docs/reference/datatransfer/rest/v1/projects.locations.transferConfigs/startManualRuns

    【讨论】:

    • 我相信地址已经改变了。我能够从文档页面运行它。但我需要从云功能运行它,并进行身份验证。您能提供更完整的解决方案吗?
    【解决方案3】:

    根据 Guillem 的回答和 API 更新,这是我的新代码:

        import time
        from google.cloud.bigquery import datatransfer_v1
        from google.protobuf.timestamp_pb2 import Timestamp
    
    
        client = datatransfer_v1.DataTransferServiceClient()
        config = '34y....654'
    
        PROJECT_ID = 'PROJECT_ID'
        TRANSFER_CONFIG_ID = config
    
        parent = client.transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
    
        start_time = Timestamp(seconds=int(time.time()))
    
        request = datatransfer_v1.types.StartManualTransferRunsRequest(
            { "parent": parent, "requested_run_time": start_time }
        )
    
        response = client.start_manual_transfer_runs(request, timeout=360)
        print(response)
    

    【讨论】:

      【解决方案4】:

      为此,您需要知道正确的TRANSFER_CONFIG_ID

      就我而言,我想列出所有 BigQuery 计划查询,以获取特定 ID。你可以这样做:

      # Put your projetID here
      PROJECT_ID = 'PROJECT_ID'
      
      from google.cloud import bigquery_datatransfer_v1
      
      bq_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
      parent = bq_transfer_client.project_path(PROJECT_ID)
      
      # Iterate over all results
      for element in bq_transfer_client.list_transfer_configs(parent):
         
         # Print Display Name for each Scheduled Query
         print(f'[Schedule Query Name]:\t{element.display_name}')
         
         # Print name of all elements (it contains the ID)
         print(f'[Name]:\t\t{element.name}')
         
         # Extract the IDs:
         TRANSFER_CONFIG_ID= element.name.split('/')[-1]
         print(f'[TRANSFER_CONFIG_ID]:\t\t{TRANSFER_CONFIG_ID}')
      
         # You can print the entire element for debug purposes
         print(element)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-05-03
        • 2018-04-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多