【问题标题】:Is it possible to execute Step Concurrency for AWS EMR through AWS STEP FUNCTION without Lambda?是否可以在没有 Lambda 的情况下通过 AWS STEP FUNCTION 为 AWS EMR 执行 Step Concurrency?
【发布时间】:2020-05-02 11:44:25
【问题描述】:

这是我的场景,我正在尝试创建 4 个 AWS EMR 集群,其中每个集群将分配有 2 个作业,因此它就像使用 Step Function 编排的具有 8 个作业的 4 个集群。

我的流程应该是这样的:

4 个集群将同时启动并行运行 8 个作业,其中每个集群将并行运行 2 个作业。

现在,AWS 最近推出了这项功能,可以在 EMR 中使用 StepConcurrencyLevel 在单个集群中同时运行 2 个(或)多个作业,以减少集群的运行时间,这可以使用 EMR 控制台执行,AWS CLI(或)甚至通过 AWS lambda。

但是,我想使用 AWS Step Function 及其状态机语言在单个集群中并行启动 2 个(或)多个作业,就像这里提到的格式 https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html

我尝试引用许多站点来执行此过程,我通过控制台(或)通过 AWS lambda 中的 boto3 格式获得解决方案,但我找不到通过 Step 执行此过程的解决方案函数本身...

有什么解决办法吗!?

提前致谢..

【问题讨论】:

  • 你也可以通过 livy api 进行 spark 提交,有什么理由不使用它吗?
  • 是的..我有这个要求,通过步进函数的状态机语言只使用aws emr集群来执行整个过程......
  • 那不回答任何问题。此外,尝试使用自动扩展的单个 EMR 集群而不是 4 个集群

标签: amazon-web-services amazon-emr aws-step-functions


【解决方案1】:

所以,我浏览了更多网站并找到了解决问题的方法...

我面临的问题是 StepConcurrencyLevel,我可以在其中使用 AWS 控制台(或)通过 AWS CLI(或)甚至通过 Python 使用 BOTO3 添加它......但我期待使用状态机语言的解决方案,但我找到了一个。 ..

我们所要做的就是在使用状态机语言创建集群时,我们必须在其中指定 StepConcurrencyLevel,例如 2(或)3,其中默认值为 1。一旦设置好,然后在该集群下创建 4 个步骤并运行状态机。

集群将在哪里识别已设置的并发数并相应地运行步骤。

我的示例流程:

-> 我的编排的 JSON 脚本

 {
  "StartAt": "Create_A_Cluster",
  "States": {
    "Create_A_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
      "Parameters": {
        "Name": "WorkflowCluster",
        "StepConcurrencyLevel": 2,
        "Tags": [
          {
            "Key": "Description",
            "Value": "process"
          },
          {
            "Key": "Name",
            "Value": "filename"
          },
          {
            "Key": "Owner",
            "Value": "owner"
          },
          {
            "Key": "Project",
            "Value": "roject"
          },
          {
            "Key": "User",
            "Value": "user"
          }
        ],
        "VisibleToAllUsers": true,
        "ReleaseLabel": "emr-5.28.1",
        "Applications": [
          {
            "Name": "Spark"
          }
        ],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "LogUri": "s3://prefix/prefix/log.txt/",
        "Instances": {
          "KeepJobFlowAliveWhenNoSteps": true,
          "InstanceFleets": [
            {
              "InstanceFleetType": "MASTER",
              "TargetSpotCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge",
                  "BidPriceAsPercentageOfOnDemandPrice": 90
                }
              ]
            },
            {
              "InstanceFleetType": "CORE",
              "TargetSpotCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge",
                  "BidPriceAsPercentageOfOnDemandPrice": 90
                }
              ]
            }
          ]
        }
      },
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 5,
          "MaxAttempts": 1,
          "BackoffRate": 2.5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Fail_Cluster"
        }
      ],
      "ResultPath": "$.cluster",
      "OutputPath": "$.cluster",
      "Next": "Add_Steps_Parallel"
    },
    "Fail_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
        "Message.$": "$.Cause"
      },
      "Next": "Terminate_Cluster"
    },
    "Add_Steps_Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Step_One",
          "States": {
            "Step_One": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The first step",
                  "ActionOnFailure": "TERMINATE_CLUSTER",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "cluster",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.dynamicAllocation.enabled=true",
                      "--conf",
                      "maximizeResourceAllocation=true",
                      "--conf",
                      "spark.shuffle.service.enabled=true",
                      "--py-files",
                      "s3://prefix/prefix/pythonfile.py",
                      "s3://prefix/prefix/pythonfile.py"
                    ]
                  }
                }
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 5,
                  "MaxAttempts": 1,
                  "BackoffRate": 2.5
                }
              ],
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "ResultPath": "$.err_mgs",
                  "Next": "Fail_SNS"
                }
              ],
              "ResultPath": "$.step1",
              "Next": "Terminate_Cluster_1"
            },
            "Fail_SNS": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
                "Message.$": "$.err_mgs.Cause"
              },
              "ResultPath": "$.fail_cluster",
              "Next": "Terminate_Cluster_1"
            },
            "Terminate_Cluster_1": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Step_Two",
          "States": {
            "Step_Two": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The second step",
                  "ActionOnFailure": "TERMINATE_CLUSTER",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "cluster",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.dynamicAllocation.enabled=true",
                      "--conf",
                      "maximizeResourceAllocation=true",
                      "--conf",
                      "spark.shuffle.service.enabled=true",
                      "--py-files",
                      "s3://prefix/prefix/pythonfile.py",
                      "s3://prefix/prefix/pythonfile.py"
                    ]
                  }
                }
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 5,
                  "MaxAttempts": 1,
                  "BackoffRate": 2.5
                }
              ],
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "ResultPath": "$.err_mgs_1",
                  "Next": "Fail_SNS_1"
                }
              ],
              "ResultPath": "$.step2",
              "Next": "Terminate_Cluster_2"
            },
            "Fail_SNS_1": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
                "Message.$": "$.err_mgs_1.Cause"
              },
              "ResultPath": "$.fail_cluster_1",
              "Next": "Terminate_Cluster_2"
            },
            "Terminate_Cluster_2": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.steps",
      "Next": "Terminate_Cluster"
    },
    "Terminate_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId"
      },
      "End": true
    }
  }
}

在此脚本(或)AWS Step Function 的状态机语言中,在创建集群时,我提到 StepConcurrencyLevel 为 2,并在集群下方添加了 2 个 spark 作业作为 Steps。

当我在 Step Function 中运行此脚本时,我能够编排集群和步骤以在集群中同时运行 2 个步骤,而无需直接在 AWS EMR 控制台(或)通过 AWS CLI(或)甚至通过 BOTO3 进行配置。

我只是使用状态机语言来执行编排,在 AWS Step Function 下的单个集群中同时运行 2 个步骤,而没有任何其他服务(如 lambda 或 livy API 或 BOTO3 等)的帮助...

流程图如下所示: AWS Step Function Workflow for concurrent step execution

为了更准确地了解我在上述状态机语言中插入 StepConcurrencyLevel 的位置,请点击此处:

"Create_A_Cluster": {
  "Type": "Task",
  "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
  "Parameters": {
    "Name": "WorkflowCluster",
    "StepConcurrencyLevel": 2,
    "Tags": [
      {
        "Key": "Description",
        "Value": "process"
      },

Create_A_Cluster下。

谢谢。

【讨论】:

  • 嗨,我有一个用例,我需要在 SF 的帮助下运行并发步骤。因此,我想知道几件事: 1. 如果从 SF 的状态延迟执行步骤,对集群有什么影响 2. 您是否遇到过任何主要的障碍 3. 这样做的利弊是什么它在 SF 的帮助下
  • 实际上,我们确实有一些缺点,例如并发步骤执行不会与我们拥有的集群配置同时运行 2 个步骤......它使用给定的配置加速了第一步,第二步是正在运行,但未达到完整的集群容量。那么,SF和EMR集群之间的通信是好的就没有问题了……
  • @NaveenB "ActionOnFailure": "TERMINATE_CLUSTER",对你有用吗?尝试此选项时出现错误验证异常。
  • @NaveenB 如果 step1 仍在运行并且 step2 失败会发生什么情况, step2 terminate_cluster_2 会等到第 1 步完成还是直接终止集群?
  • @sriharikalicharanTummala 它会等待,因为我们在并行类型下运行这些步骤,所以一旦并行下的两个步骤都完成,那么集群将终止
猜你喜欢
  • 2018-10-18
  • 1970-01-01
  • 1970-01-01
  • 2021-07-27
  • 1970-01-01
  • 1970-01-01
  • 2021-01-14
  • 2021-10-03
  • 1970-01-01
相关资源
最近更新 更多