【问题标题】:Google DataFlow Updating an existing pipelineGoogle DataFlow 更新现有管道
【发布时间】:2022-09-27 19:59:32
【问题描述】:
我正在尝试更新正在运行的数据流作业。
遵循本指南:https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
我使用 pipeline.run() 从 pom 文件创建的选项执行了一个管道:
并且能够使用我的自定义模板在数据流上运行新作业
gcloud dataflow jobs run myJobName *arguments*
当我尝试更新作业时,我将添加指南中提到的接下来的两个参数:<argument>--update</argument> <argument>--jobName=${jobName}</argument>
我正在执行管道(使用 pipeline.run()),然后我想用新模板更新旧作业。
我可以看到我的新模板在那里,我可以使用以下命令从中创建新作业:
gcloud dataflow jobs run myJobName *arguments*
但我得到的只是一份新工作,而我的旧工作没有更新。
我错过了什么吗?当他们在指南中提到“启动新作业”时,他们是在谈论执行管道(使用 pipeline.run())还是从新模板运行作业?
标签:
google-cloud-platform
google-cloud-dataflow
【解决方案1】:
我一直在阅读这些相同的文档,同时设置 CICD 流程以将 updates 部署到我的 Dataflow 流作业。
我相信这是预期的行为,因为gcloud dataflow run ... 将:
-
drainjob_name 的现有工作
- 使用相同的
job_name(但新的job_id)创建一个新作业
- 启动新的
job_id
作为drain 的一部分,当前正在处理的所有消息都将完成,防止重复和丢失(意外的ACK)。
但我得到的只是一份新工作,而我的旧工作没有更新。
我错过了什么吗?
作为参考,我在部署过程中运行的命令是:
- 要构建上传模板:
echo "Deploying ApacheBeam template to GCS..."
python3 ${_SCRIPT_LOCATION} \
--project=${_PROJECT_ID} \
--template_location=${_TEMPLATE_LOCATION} \
--temp_location=${_TEMPORARY_LOCATION} \
--region=us-central1 \
--runner=DataflowRunner \
--staging_location=${_STORAGE_LOCATION} \
--streaming \
--update \
--job_name=${_JOB_NAME}
- 使用模板重新创建/启动
Job_Name:
echo -e "Updating ${_JOB_NAME} to point at new ApacheBeam template.."
gcloud dataflow jobs run ${_JOB_NAME} --gcs-location ${_TEMPLATE_LOCATION}
如果这有帮助,请 lmk 或在下面的 cmets 中询问跟进
【解决方案2】:
感谢您提供上述信息...想知道,您是如何获得 _TEMPLATE_LOCATION 的? ..您是否对其进行了硬编码或者您在运行时获取它? ...我有兴趣知道我们如何在运行时为作业获取它。
感谢您的意见。
谢谢