【发布时间】:2020-09-09 02:10:05
【问题描述】:
有没有办法从基于 CDAP 事件的触发器运行 Google 数据融合管道?
第一个要求是,每当新文件到达 GCS 存储桶时。它将触发数据融合管道自动运行。
第二个需求是流水线依赖,比如流水线A没有启动或者失败,流水线B就不能运行。
谢谢
【问题讨论】:
标签: google-cloud-data-fusion cdap
有没有办法从基于 CDAP 事件的触发器运行 Google 数据融合管道?
第一个要求是,每当新文件到达 GCS 存储桶时。它将触发数据融合管道自动运行。
第二个需求是流水线依赖,比如流水线A没有启动或者失败,流水线B就不能运行。
谢谢
【问题讨论】:
标签: google-cloud-data-fusion cdap
查看您的初始用例,我假设对于 第二个 要求,您可能会考虑查看 CDAP 纯组件,例如:Schedules、@987654322 @ 和 Triggers.
通常,为具有某些条件执行模式的底层管道设计运行流程,您可以通过定义保存条件逻辑组合的特定 Workflow 来创建 Schedule 对象在管道之间并应用与您的事件发生匹配的触发器的模型。
根据 CDAP 文档:
工作流程可以由CDAP CLI 和Lifecycle HTTP RESTful API 控制。
如上所述,需要编写一个适当的HTTP请求来
CDAP REST API,包含存储要创建的计划的详细信息的 JSON 对象,基于文档中的 example 以及我创建工作流的进一步参考,而 Pipeline_2 仅在 Pipeline_1 成功时触发:
{
"name": "Schedule_1",
"description": "Triggers Pipeline_2 on the succeding execution of Pipeline_1",
"namespace": "<Pipeline_2-namespace>",
"application": "Pipeline_2",
"version": "<application version of the Pipeline_2>",
"program": {
"programName": "Workflow_name",
"programType": "WORKFLOW"
},
"trigger": {
"type": "PROGRAM_STATUS",
"programId": {
"namespace": "<Pipeline_1-namespace>",
"application": "Pipeline_1",
"version": "<application version of the Pipeline_1>",
"type": "WORKFLOW",
"entity": "PROGRAM",
"program": "Workflow_name"
},
"programStatuses": ["COMPLETED"]
}
}
对于 1st 要求,我不确定是否可以在 Data Fusion/CDAP 原生工具中实现,但我无法看到此类事件,与不断发现 GCS 存储桶:
触发器由事件触发,例如在 数据集,或执行时间触发器的 cron 表达式,或 程序的状态。
在这种情况下,我会查看 GCP Cloud function 和 GCP Composer,写得很好 example,描述了如何将 Cloud Functions 用于基于事件的 DAG 触发器,假设特别是 Composer DAG 文件你可以调用顺序数据融合管道执行。查看此 Stack thread 了解更多详情。
【讨论】: