【问题标题】:How can I wait for completion of an Elastic MapReduce job flow in a Java application?如何等待 Java 应用程序中的 Elastic MapReduce 作业流程完成?
【发布时间】:2014-12-20 08:46:40
【问题描述】:

最近我一直在使用 Amazon Web Services (AWS),我注意到没有太多关于这个主题的文档,所以我添加了我的解决方案。

我正在使用 Amazon Elastic MapReduce (Amazon EMR) 编写应用程序。计算结束后,我需要对他们创建的文件执行一些工作,因此我需要知道工作流程何时完成工作。

这是检查工作流程是否完成的方法:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
    .withJobFlowStates("COMPLETED");

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);

detail.getJobFlowId(); //the id of one of the completed jobs

您还可以在 DescribeJobFlowsRequest 中查找特定的作业 ID,然后检查该作业是否已完成或失败。

我希望它会帮助其他人。

【问题讨论】:

  • 在这里非常欢迎立即提交您自己的问题解决方案,但是,所需的方法是将其拆分为一个问题和一个答案,请参阅It’s OK to Ask and Answer Your Own Questions - 这有助于对事物进行排序/分类适当地,即在适用的情况下为真正未回答的问题腾出空间,谢谢!
  • 谢谢,我会记下来作为将来的参考。
  • 您还应该包括其他已完成的状态。如果将jobAttributes 初始化为给定的,某些阅读本文的人可能会永远循环。 DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest().withJobFlowStates( "COMPLETED", "TERMINATED", "FAILED" );

标签: java amazon-web-services elastic-map-reduce amazon-emr


【解决方案1】:

我也遇到了这个问题,这是我现在想出的解决方案。它并不完美,但希望它会有所帮助。作为参考,我使用的是 Java 1.7 和 AWS Java SDK 版本 1.9.13。

请注意,此代码假定您正在等待 cluster 终止,而不是 steps 严格地说;如果您的集群在所有步骤完成后终止,这没关系,但如果您使用的集群在步骤完成后仍保持活动状态,这对您没有太大帮助。

另外,请注意,此代码监控和记录集群状态更改,此外还诊断集群是否因错误而终止,如果发生则抛出异常。

private void yourMainMethod() {
    RunJobFlowRequest request = ...;

    try {
        RunJobFlowResult submission = emr.runJobFlow(request);
        String jobFlowId = submission.getJobFlowId();
        log.info("Submitted EMR job as job flow id {}", jobFlowId);

        DescribeClusterResult result = 
            waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
        diagnoseClusterResult(result, jobFlowId);
    } finally {
        emr.shutdown();
    }
}

private DescribeClusterResult waitForCompletion(
             AmazonElasticMapReduceClient emr, String jobFlowId,
             long sleepTime, TimeUnit timeUnit)
        throws InterruptedException {
    String state = "STARTING";
    while (true) {
        DescribeClusterResult result = emr.describeCluster(
                new DescribeClusterRequest().withClusterId(jobFlowId)
        );
        ClusterStatus status = result.getCluster().getStatus();
        String newState = status.getState();
        if (!state.equals(newState)) {
            log.info("Cluster id {} switched from {} to {}.  Reason: {}.",
                     jobFlowId, state, newState, status.getStateChangeReason());
            state = newState;
        }

        switch (state) {
            case "TERMINATED":
            case "TERMINATED_WITH_ERRORS":
            case "WAITING":
                return result;
        }

        timeUnit.sleep(sleepTime);
    }
}

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
    ClusterStatus status = result.getCluster().getStatus();
    ClusterStateChangeReason reason = status.getStateChangeReason();
    ClusterStateChangeReasonCode code = 
        ClusterStateChangeReasonCode.fromValue(reason.getCode());
    switch (code) {
    case ALL_STEPS_COMPLETED:
        log.info("Completed EMR job {}", jobFlowId);
        break;
    default:
        failEMR(jobFlowId, status);
    }
}

private static void failEMR(String jobFlowId, ClusterStatus status) {
    String msg = "EMR cluster run %s terminated with errors.  ClusterStatus = %s";
    throw new RuntimeException(String.format(msg, jobFlowId, status));
}

【讨论】:

    【解决方案2】:

    作业流程完成后,集群停止,HDFS 分区丢失。 为了防止数据丢失,请将作业流的最后一步配置为将结果存储在 Amazon S3 中。

    如果 JobFlowInstancesDetail : KeepJobFlowAliveWhenNoSteps 参数设置为 TRUE,作业流程将 转换到 WAITING 状态,而不是在步骤完成后关闭。

    每个作业流程最多允许 256 个步骤。

    如果您的工作很耗时,我建议您定期存储结果。

    长话短说:无法知道何时完成。 相反,您需要将数据保存为工作的一部分。

    【讨论】:

      【解决方案3】:

      创建作业流时使用--wait-for-steps 选项。

      ./elastic-mapreduce --create \
      ...
       --wait-for-steps \
      ...
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-06-21
        • 2015-04-17
        • 1970-01-01
        • 1970-01-01
        • 2023-04-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多