【问题标题】:"Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used" on an EMR cluster with 75GB of memory“容器因超出内存限制而被 YARN 杀死。使用 10.4 GB 的 10.4 GB 物理内存”在具有 75 GB 内存的 EMR 集群上
【发布时间】:2017-04-08 11:01:53
【问题描述】:

我在 AWS EMR 上运行一个 5 节点 Spark 集群,每个大小为 m3.xlarge(1 个主节点 4 个从节点)。我成功运行了一个 146Mb bzip2 压缩的 CSV 文件,最终得到了完美的汇总结果。

现在我正在尝试在此集群上处理 ~5GB bzip2 CSV 文件,但我收到此错误:

16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executorLostFailure (executor 16 exited由一个正在运行的任务引起))原因:容器因超出内存限制而被 YARN 杀死。使用了 10.4 GB 的 10.4 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。

我很困惑为什么我在 ~75GB 集群上获得 ~10.5GB 内存限制(每个 3m.xlarge 实例 15GB)...

这是我的 EMR 配置:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

根据我的阅读,设置 maximizeResourceAllocation 属性应该告诉 EMR 配置 Spark 以充分利用集群上的所有可用资源。即,我应该有大约 75GB 的可用内存......那么为什么我会收到大约 10.5GB 的内存限制错误? 这是我正在运行的代码:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

基本上,无非就是开窗和 groupBy 来聚合数据。

从一些错误开始,然后停止增加相同错误的数量。

我尝试使用 --conf spark.yarn.executor.memoryOverhead 运行 spark-submit,但这似乎也不能解决问题。

【问题讨论】:

标签: apache-spark emr amazon-emr bigdata


【解决方案1】:

我感觉到你的痛苦..

在使用 Spark on YARN 时,我们也遇到过类似的内存不足问题。我们有五个 64GB、16 个核心的虚拟机,无论我们将spark.yarn.executor.memoryOverhead 设置为什么,我们都无法为这些任务获得足够的内存——无论我们给它们多少内存,它们最终都会死掉。这是一个相对简单的 Spark 应用程序,它导致了这种情况的发生。

我们发现虚拟机上的物理内存使用率很低,但虚拟内存使用率却非常高(尽管日志抱怨 物理 内存)。我们将yarn-site.xml 中的yarn.nodemanager.vmem-check-enabled 设置为false,我们的容器不再被杀死,应用程序似乎按预期工作。

做更多的研究,我找到了为什么会在这里发生的答案:http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

由于在 Centos/RHEL 6 上,由于操作系统行为会积极分配虚拟内存,因此您应该禁用虚拟内存检查器或将 yarn.nodemanager.vmem-pmem-ratio 增加到相对较大的值。

该页面有一个指向 IBM 非常有用的页面的链接:https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

总之,glibc > 2.10 改变了它的内存分配。尽管分配大量虚拟内存并不是世界末日,但它不适用于 YARN 的默认设置。

除了将yarn.nodemanager.vmem-check-enabled 设置为false,您还可以将MALLOC_ARENA_MAX 环境变量设置为hadoop-env.sh 中的一个较小的数字。这个错误报告有关于这方面的有用信息:https://issues.apache.org/jira/browse/HADOOP-7154

我建议通读这两页——信息非常方便。

【讨论】:

  • 属性是yarn.nodemanager.vmem-check-enabled,注意连字符
  • 我在 yarn-site.xml 中没有找到这个属性。我正在将 Spark 与 Amazon EMR 结合使用
  • @lfvv 您可能需要手动添加它。您可以在此处找到各种其他设置:hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/…
  • 我不认为告诉资源管理器不再正确管理其资源是一个很好的解决方案。
  • @ClemensValiente 我觉得你是对的......在这种情况下,也许调整 MALLOC_ARENA_MAX 是更好的方法。但是,我自己还没有尝试过。
【解决方案2】:

如果您不使用spark-submit,并且正在寻找另一种方法来指定Duff 提到的yarn.nodemanager.vmem-check-enabled 参数,这里有另外两种方法:

方法二

如果您使用的是 JSON 配置文件(传递给 AWS CLI 或 boto3 脚本),则必须添加以下配置:

[{
"Classification": "yarn-site", 
  "Properties": {
    "yarn.nodemanager.vmem-check-enabled": "false"
   }
}]

方法3

如果您使用 EMR 控制台,请添加以下配置:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]

【讨论】:

    【解决方案3】:

    看,

    我现在正在工作的一个巨大集群中遇到了同样的问题。给worker增加内存并不能解决问题。有时,进程中的聚合 spark 会使用比它拥有的更多的内存,并且 spark 作业将开始使用堆外内存。

    一个简单的例子是:

    如果您有一个需要 reduceByKey 的数据集,它有时会在一个工作人员中聚合比其他工作人员更多的数据,如果此数据超出了一名工作人员的内存,您会收到该错误消息。

    添加选项spark.yarn.executor.memoryOverhead如果你设置了50%的用于worker的内存对你有帮助(只是为了测试,看看它是否有效,你可以添加更少的更多的测试)。

    但您需要了解 Spark 如何处理集群中的内存分配:

    1. Spark 使用 75% 机器内存的更常见方式。剩下的交给 SO。
    2. Spark 在执行期间拥有two types 的内存。一部分用于执行,另一部分用于存储。执行用于 Shuffle、Join、Aggregations 等。存储用于在集群中缓存和传播数据。

    关于内存分配的一件好事,如果您在执行中不使用缓存,您可以设置 spark 使用该存储空间来执行执行,以避免部分 OOM 错误。正如您在 spark 的文档中看到的那样:

    这种设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间来执行,避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小存储空间 (R),其中它们的数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户具备内部分配内存的专业知识。

    但是我们如何使用它呢?

    您可以更改一些配置,将 MemoryOverhead 配置添加到您的工作调用中,但是,也可以考虑添加此配置:spark.memory.fraction 更改为 0.8 或 0.85 并将 spark.memory.storageFraction 减少到 0.35 或 0.2。

    其他配置可能会有所帮助,但需要根据您的情况进行检查。查看所有这些配置here

    现在,对我有什么帮助。

    我有一个包含 2.5K 工作人员和 2.5TB RAM 的集群。我们正面临像你一样的OOM错误。我们只是将spark.yarn.executor.memoryOverhead 增加到 2048。然后我们启用dynamic allocation。当我们调用工作时,我们不会为工作人员设置内存,我们将其留给 Spark 来决定。我们只是设置了 Overhead。

    但是对于我的小集群的一些测试,改变了执行和存储内存的大小。这样就解决了问题。

    【讨论】:

    • 我已经使用这些参数重新运行,使用 1+4 m3.xlarge 机器的集群:spark-submit --deploy-mode cluster --conf spark.executor.memory=12g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.35,并在步骤开始时立即收到此错误:Exception in thread "main" java.lang.IllegalArgumentException: Required executor memory (12288+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'
    • 这条信息是告诉你你需要做什么:你的spark.executor.memory+spark.yarn.executor.memoryOverhead必须小于yarn.nodemanager.resource.memory-mb。我建议你减少memoryOverhead,对于一个15g的节点,它可以是1g(1024 mb),我会把你的yarn.nodemanager.resource.memory-mb增加到12288 mb,把你的spark.executor.memory减少到11264 mb。如果这不起作用,则将yarn.nodemanager.resource.memory-mb 增加到13312 mb,并告诉我您的yarn.scheduler.maximum-allocation-mb 是什么。
    • 这是一个比公认的答案更好、干扰更少的选项。如果您已经在 Spark 正在执行的 YARN 实例上运行了其他应用程序,那么更改 yarn-site.xml 可能会非常冒险并且会产生广泛的后果。
    • 我不能说旧版本,但 Spark 2.3.1 在缺少堆时不使用堆外。它将内存分成 2 个池:执行和存储。当其中一个池溢出时,它会咬掉另一个池。当两个池都已满时,任务将被阻止,直到可用内存可用。
    • 这似乎是一个非常具有误导性的错误消息。因为看起来问题不在于 memoryOverhead 低,而在于整体执行程序内存。那么更好的解决方案/错误消息可能是增加执行程序内存?
    【解决方案4】:

    尝试重新分区。它适用于我的情况。

    数据框一开始加载write.csv()时并没有那么大。数据文件总共有10 MB左右,可能需要说执行程序中每个处理任务总共需要几个100 MB的内存。 我当时检查了分区数为2。 然后在与其他表连接、添加新列的以下操作中,它像滚雪球一样增长。然后我在某个步骤遇到了内存超出限制的问题。 我检查了分区的数量,它仍然是 2,我猜是从原始数据帧派生的。 所以我一开始尝试重新分区,就没有问题了。

    我还没有阅读很多关于 Spark 和 YARN 的资料。我所知道的是节点中有执行者。执行者可以根据资源处理许多任务。我的猜测是一个分区会被原子地映射到一个任务。它的体积决定了资源的使用量。如果一个分区变得太大,Spark 无法对其进行切片。

    一个合理的策略是先确定节点和容器内存,10GB 或者 5GB。理想情况下,两者都可以服务于任何数据处理工作,只是时间问题。给定 5GB 内存设置,你找到的一个分区的合理行,比如测试后是 1000 (在处理过程中不会失败任何步骤),我们可以按照以下伪代码进行:

    RWS_PER_PARTITION = 1000
    input_df = spark.write.csv("file_uri", *other_args)
    total_rows = input_df.count()
    original_num_partitions = input_df.getNumPartitions()
    numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions)
    input_df = input_df.repartition(numPartitions)
    

    希望对你有帮助!

    【讨论】:

      【解决方案5】:

      我在 spark 2.3.1 上运行相对较小的作业的小型集群上遇到了同样的问题。 该作业读取 parquet 文件,使用 groupBy/agg/first 删除重复项,然后排序并写入新的 parquet。它在 4 个节点(4 个 vcore,32Gb RAM)上处理了 51 GB 的 parquet 文件。

      作业在聚合阶段不断失败。我编写了 bash 脚本监视执行程序内存使用情况,发现在阶段中间,一个随机执行程序开始占用双倍内存几秒钟。当我将这一刻的时间与 GC 日志相关联时,它与清空大量内存的完整 GC 相匹配。

      最后我明白这个问题在某种程度上与 GC 有关。 ParallelGC 和 G1 经常导致这个问题,但 ConcMarkSweepGC 改善了这种情况。该问题仅出现在少量分区中。我在安装了OpenJDK 64-Bit (build 25.171-b10) 的 EMR 上运行了这项工作。我不知道问题的根本原因,它可能与 JVM 或操作系统有关。但在我的情况下,它绝对与堆或堆外使用无关。

      更新1

      试过Oracle HotSpot,问题重现。

      【讨论】:

        猜你喜欢
        • 2019-11-06
        • 1970-01-01
        • 2015-12-29
        • 2017-11-16
        • 2021-06-13
        • 2020-02-09
        • 2016-08-24
        • 1970-01-01
        • 2018-05-03
        相关资源
        最近更新 更多