【问题标题】:Is there a way to get Step Functions input values into EMR step Args有没有办法让 Step Functions 输入值进入 EMR step Args
【发布时间】:2020-01-09 15:25:49
【问题描述】:

我们正在使用 AWS EMR 集群运行批处理 Spark 作业。这些作业会定期运行,我们希望通过 AWS Step Functions 进行编排。

截至 2019 年 11 月,Step Functions 已原生支持 EMR。向集群添加 Step 时,我们可以使用以下配置:

"Some Step": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
    "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
            "Name": "FirstStep",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--class",
                    "com.some.package.Class",
                    "JarUri",
                    "--startDate",
                    "$.time",
                    "--daysToLookBack",
                    "$.daysToLookBack"
                ]
             }
         }
     },
     "Retry" : [
         {
             "ErrorEquals": [ "States.ALL" ],
             "IntervalSeconds": 1,
             "MaxAttempts": 1,
             "BackoffRate": 2.0
         }
     ],
     "ResultPath": "$.firstStep",
     "End": true
}

在 HadoopJarStep 的 Args 列表中,我们希望动态设置参数。例如如果状态机执行的输入是:

{
    "time": "2020-01-08",
    "daysToLookBack": 2
}

配置中以“$”开头的字符串。执行状态机时应相应替换,EMR集群上的步骤应运行command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2。但它运行的是command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack

有谁知道有没有办法做到这一点?

【问题讨论】:

    标签: amazon-emr aws-step-functions


    【解决方案1】:

    参数允许您定义键值对,因此“Args”键的值是一个数组,您将无法动态引用数组中的特定元素,您需要引用整个代替数组。例如“Args.$”:“$.Input.ArgsArray”。

    因此,对于您的用例,实现此目的的最佳方法是在调用此状态之前添加一个预处理状态。在预处理状态下,您可以调用 Lambda 函数并通过代码格式化您的输入/输出,或者对于像向数组添加动态值这样简单的操作,您可以使用 Pass State 重新格式化数据,然后在您的任务中您可以使用状态参数JSONPath 来获取您在预处理器中定义的数组。这是一个例子:

    {
    "Comment": "A Hello World example of the Amazon States Language using Pass states",
    "StartAt": "HardCodedInputs",
    "States": {
        "HardCodedInputs": {
            "Type": "Pass",
            "Parameters": {
                "cluster": {
                    "ClusterId": "ValueForClusterIdVariable"
                },
                "time": "ValueForTimeVariable",
                "daysToLookBack": "ValueFordaysToLookBackVariable"
            },
            "Next": "Pre-Process"
        },
        "Pre-Process": {
            "Type": "Pass",
            "Parameters": {
                "FormattedInputsForEmr": {
                    "ClusterId.$": "$.cluster.ClusterId",
                    "Args": [
                        {
                            "Arg1": "spark-submit"
                        },
                        {
                            "Arg2": "--class"
                        },
                        {
                            "Arg3": "com.some.package.Class"
                        },
                        {
                            "Arg4": "JarUri"
                        },
                        {
                            "Arg5": "--startDate"
                        },
                        {
                            "Arg6.$": "$.time"
                        },
                        {
                            "Arg7": "--daysToLookBack"
                        },
                        {
                            "Arg8.$": "$.daysToLookBack"
                        }
                    ]
                }
            },
            "Next": "Some Step"
        },
        "Some Step": {
            "Type": "Pass",
            "Parameters": {
                "ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
                "Step": {
                    "Name": "FirstStep",
                    "ActionOnFailure": "CONTINUE",
                    "HadoopJarStep": {
                        "Jar": "command-runner.jar",
                        "Args.$": "$.FormattedInputsForEmr.Args[*][*]"
                    }
                }
            },
            "End": true
        }
      }
    }
    

    【讨论】:

      【解决方案2】:

      您可以使用States.Array() 内部函数。您的Parameters 变为:

        "Parameters": {
          "ClusterId.$": "$.cluster.ClusterId",
          "Step": {
            "Name": "FirstStep",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
              "Jar": "command-runner.jar",
              "Args.$": "States.Array('spark-submit', '--class', 'com.some.package.Class', 'JarUri', '--startDate', $.time, '--daysToLookBack', '$.daysToLookBack')"
            }
          }
        }
      

      内部函数记录在here,但我认为它不能很好地解释用法。 Step Functions 控制台中提供的代码 sn-ps 更有用。

      请注意,您还可以使用 States.Format() 对 args 进行字符串格式化。例如,您可以使用输入变量作为最终路径段来构造路径:

      "Args.$": "States.Array('mycommand', '--path', States.Format('my/base/path/{}', $.someInputVariable))"
      

      【讨论】:

      • Step 函数的内在函数按预期工作以传递动态参数。感谢您指出这一点!
      • 一个 qustion ,可以运行一个集群 id 被步进函数停止
      • 我不知道谁需要阅读这篇文章,但只知道您在 States.Array() 调用中放置的任何字符串文字都需要在单引号内。如果您使用双引号,您将收到一条类似SCHEMA_VALIDATION_FAILED: The value for the field '<your field>' must be a valid JSONPath 的不清楚的错误消息。
      猜你喜欢
      • 1970-01-01
      • 2020-02-11
      • 2018-05-18
      • 2020-09-05
      • 2021-11-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多