如果我将 DTS 设置为按需提供,我如何从 API 调用它?我正在考虑创建一个每 10 分钟按需调用它的 cronjob。但我无法通过文档弄清楚如何调用它。
StartManualTransferRuns 是RPC library 的一部分,但目前还没有等效的 REST API。如何使用它取决于您的环境。例如,您可以使用 Python 客户端库 (docs)。
作为示例,我使用了以下代码(您需要运行 pip install google-cloud-bigquery-datatransfer 以获得相关性):
import time
from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = bigquery_datatransfer_v1.DataTransferServiceClient()
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc' # alphanumeric ID you'll find in the UI
parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
请注意,您需要使用正确的传输配置 ID,并且 requested_run_time 必须是 bigquery_datatransfer_v1.types.Timestamp 类型(文档中没有示例)。我将开始时间设置为比当前执行时间提前 10 秒。
您应该得到如下响应:
runs {
name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
destination_dataset_id: "DATASET_NAME"
schedule_time {
seconds: 1579358571
nanos: 922599371
}
...
data_source_id: "google_cloud_storage"
state: PENDING
params {
...
}
run_time {
seconds: 1579358581
}
user_id: 28...65
}
并且传输按预期触发(不要介意错误):
另外,我将 GCS 文件(不需要 ETL)移动到与确切架构匹配的 bq 表中的第二个最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud Run 等吗?
有了这个,您可以设置一个 cron 作业以每十分钟执行一次您的函数。正如 cmets 中所讨论的,最小间隔是 60 分钟,因此它不会拾取少于一小时的文件 (docs)。
除此之外,这不是一个非常强大的解决方案,您的后续问题会在这里发挥作用。我认为这些可能过于宽泛,无法在单个 StackOverflow 问题中解决,但我想说,对于按需刷新,Cloud Scheduler + Cloud Functions/Cloud Run 可以很好地工作。
如果您需要 ETL,Dataflow 将是最好的选择,但它有一个可以监视文件模式的 GCS 连接器 (example)。这样您就可以跳过传输,设置监视间隔和加载作业触发频率以将文件写入 BigQuery。与以前的方法不同,VM 将在流式管道中持续运行,但可以进行 10 分钟的观察期。
如果您有复杂的工作流程/依赖项,Airflow 最近引入了operators 来开始手动运行。
如果我使用 Cloud Function,如何在调用时将 GCS 中的所有文件作为一个 bq 加载作业提交?
您可以在创建传输时使用wildcards 匹配文件模式:
此外,这可以使用Pub/Sub notifications for Cloud Storage 逐个文件完成,以触发云函数。
最后,有人知道 DTS 是否会在未来将限制降低到 10 分钟?
已有功能请求here。随意star它以表达您的兴趣并接收更新