【发布时间】:2017-04-08 11:01:53
【问题描述】:
我在 AWS EMR 上运行一个 5 节点 Spark 集群,每个大小为 m3.xlarge(1 个主节点 4 个从节点)。我成功运行了一个 146Mb bzip2 压缩的 CSV 文件,最终得到了完美的汇总结果。
现在我正在尝试在此集群上处理 ~5GB bzip2 CSV 文件,但我收到此错误:
16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executorLostFailure (executor 16 exited由一个正在运行的任务引起))原因:容器因超出内存限制而被 YARN 杀死。使用了 10.4 GB 的 10.4 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。
我很困惑为什么我在 ~75GB 集群上获得 ~10.5GB 内存限制(每个 3m.xlarge 实例 15GB)...
这是我的 EMR 配置:
[
{
"classification":"spark-env",
"properties":{
},
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"python34"
},
"configurations":[
]
}
]
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
},
"configurations":[
]
}
]
根据我的阅读,设置 maximizeResourceAllocation 属性应该告诉 EMR 配置 Spark 以充分利用集群上的所有可用资源。即,我应该有大约 75GB 的可用内存......那么为什么我会收到大约 10.5GB 的内存限制错误?
这是我正在运行的代码:
def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp"))
diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
.over(window))
time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
.withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp")
.rowsBetween(-1, 0))
sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
return sessions
def aggregate_sessions(sessions):
median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
pyspark.sql.functions.first("site_id").alias("site_id"),
pyspark.sql.functions.first("user_id").alias("user_id"),
pyspark.sql.functions.count("id").alias("hits"),
pyspark.sql.functions.min("timestamp").alias("start"),
pyspark.sql.functions.max("timestamp").alias("finish"),
median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
)
return aggregated
spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
header=True,
inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)
基本上,无非就是开窗和 groupBy 来聚合数据。
从一些错误开始,然后停止增加相同错误的数量。
我尝试使用 --conf spark.yarn.executor.memoryOverhead 运行 spark-submit,但这似乎也不能解决问题。
【问题讨论】:
-
您愿意发布整个错误日志吗?你的描述没有多大意义。
-
嗨@eliasah,请查看pastebin.com/rPAXySWm 以获取完整的错误日志。
-
spark.executor.memory的值是多少? -
@mrsrinivas,我根本没有设置那个值。另外,在docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/… 中找不到
-
啊,好吧,@mrsrinivas 我在Spark docs 中找到了它。默认似乎是 1Gb
标签: apache-spark emr amazon-emr bigdata