【问题标题】:How I pass parameter in Workflow Template Spark job我如何在工作流模板 Spark 作业中传递参数
【发布时间】:2021-10-24 21:38:48
【问题描述】:

我的spark dataproc 工作流程有问题。

这适用于发布:

gcloud dataproc jobs submit spark \
--project myproject \
--cluster=mycluster \
--region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
--class=it.flow \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0  
--  20210820 010000 000 0 000 TRY

我创建了一个 dataproc 工作流和 python 代码,通过 composer 启动它,它可以工作。

现在我必须使最终参数动态化 (-- 20210820 010000 000 0 000 TRY)

但是,我无法将参数传递给工作流:

gcloud dataproc workflow-templates create try1 --region=europe-west3
 
gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=it.flow \
 --region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
 --properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
 -- $arg1 $arg2  
 
gcloud dataproc workflow-templates set-cluster-selector TRY1  --region=europe-west3 --cluster-labels=goog-dataproc-cluster-name=cluster

这个电话:

gcloud dataproc workflow-templates instantiate TRY1  --region=europe-west3 --parameters="arg1=20210820"

导致以下错误:

错误:(gcloud.dataproc.workflow-templates.instantiate) INVALID_ARGUMENT:模板不包含带名称的参数 arg1.

我该如何解决这个问题?

yaml 文件

id: create_file
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - gs://mybucket/try_file.jar
    - gs://mybucket/try_dependencies_2.jar
    mainClass: org.apache.hadoop.examples.tryFile
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_file_try
  parameters:
- name: ARG1
  fields:
  - jobs['create_file_try'].sparkJob.args[0]
- name: ARG2
  fields:
  - jobs['create_file_try'].sparkJob.args[1]
name: projects/My-project-id/regions/europe-west3/workflowTemplates/create_file
updateTime: '2021-08-25T07:49:59.251096Z'

【问题讨论】:

    标签: python apache-spark workflow google-cloud-dataproc


    【解决方案1】:

    要让您的工作流模板接受参数,最好使用 yaml 文件。运行完整命令 gcloud dataproc workflow-templates add-job spark 时,您可以获得 yaml 文件。它将在 CLI 上返回一个 yaml 配置。

    在这个例子中,我只是使用了sample code from the Dataproc documentation 并使用了--properties 的值来进行测试。

    注意:我在本示例的 yaml 文件中使用了虚拟 project-id。确保您使用实际的project-id,这样您就不会遇到任何问题。

    示例命令:

    gcloud dataproc workflow-templates add-job spark \
    --workflow-template=try1 \
    --step-id=create_try1 \
    --class=org.apache.hadoop.examples.WordCount \
    --region=europe-west3 \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
    -- ARG1 ARG2  
    

    CLI 输出(yaml 配置):

    id: try1
    jobs:
    - sparkJob:
        args:
        - ARG1
        - ARG2
        jarFileUris:
        - file:///usr/lib/spark/examples/jars/spark-examples.jar
        mainClass: org.apache.hadoop.examples.WordCount
        properties:
          spark.driver.cores: '2'
          spark.driver.memory: 10g
          spark.driver.userClassPathFirst: 'true'
          spark.dynamicAllocation.enabled: 'false'
          spark.executor.cores: '3'
          spark.executor.memory: 5g
          spark.executor.userClassPathFirst: 'true'
          spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
          spark.num.executors: '2'
      stepId: create_try1
    name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
    placement:
      managedCluster:
        clusterName: mycluster
    updateTime: '2021-08-25T03:30:47.365244Z'
    version: 3
    

    复制生成的 yaml 配置,打开文本编辑器并添加 parameters: 字段。它将包含您要接受的论点。

    parameters:
    - name: ARG1
      fields:
      - jobs['create_try1'].sparkJob.args[0] # use the stepId in jobs[], in this example it is 'create_try1'
    - name: ARG2
      fields:
      - jobs['create_try1'].sparkJob.args[1]
    

    在这个例子中,我把它放在stepId:之后。

    已编辑 yaml 配置:

    id: try1
    jobs:
    - sparkJob:
        args:
        - ARG1
        - ARG2
        jarFileUris:
        - file:///usr/lib/spark/examples/jars/spark-examples.jar
        mainClass: org.apache.hadoop.examples.WordCount
        properties:
          spark.driver.cores: '2'
          spark.driver.memory: 10g
          spark.driver.userClassPathFirst: 'true'
          spark.dynamicAllocation.enabled: 'false'
          spark.executor.cores: '3'
          spark.executor.memory: 5g
          spark.executor.userClassPathFirst: 'true'
          spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
          spark.num.executors: '2'
      stepId: create_try1
    parameters:
    - name: ARG1
      fields:
      - jobs['create_try1'].sparkJob.args[0]
    - name: ARG2
      fields:
      - jobs['create_try1'].sparkJob.args[1]
    name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
    placement:
      managedCluster:
        clusterName: mycluster
    updateTime: '2021-08-25T03:13:25.014685Z'
    version: 3
    

    使用编辑后的 ​​yaml 文件覆盖您的工作流模板:

    gcloud dataproc workflow-templates import try1 \
        --region=europe-west3 \
        --source=config.yaml
    

    使用gcloud dataproc workflow-templates instantiate运行模板:

    更多详情可以参考Parameterization of Workflow Templates

    【讨论】:

    • 谢谢,当我加载配置文件时出现错误。 ERROR: (gcloud.dataproc.workflow-templates.import) INVALID_ARGUMENT: Invalid JSON payload received. Unknown name "parameters" at 'template.jobs[0]': Cannot find field. Invalid JSON payload received. Unknown name "fields" at 'template.jobs[1]': Cannot find field. .. - '@type': type.googleapis.com/google.rpc.BadRequest fieldViolations: - description: "Invalid JSON payload received. Unknown name \"parameters\" at 'template.jobs[0]':\ \ Cannot find field." field: template.jobs[0]
    • 您可以编辑您的帖子并包含 yaml 文件吗?
    • @OnofrioCiliberti 我测试了您的 yaml 和字段 parameter: 不应缩进。尝试删除parameter:之前的空格。
    • 我确认,它与空间非常相关 :(,现在我尝试发射。你很友善
    猜你喜欢
    • 2021-06-13
    • 2018-03-30
    • 1970-01-01
    • 1970-01-01
    • 2017-03-24
    • 1970-01-01
    • 2011-09-14
    • 1970-01-01
    • 2021-04-29
    相关资源
    最近更新 更多