【发布时间】:2016-11-25 13:46:49
【问题描述】:
我已经用 python 编写了一个运行正常的 Spark 程序。
但是,它在内存消耗方面效率低下,我正在尝试对其进行优化。我在 AWS EMR 上运行它,而 EMR 因消耗过多内存而终止了这项工作。
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
我相信这个内存问题是由于我在几个实例中收集了我的 RDD(即使用 .collect() ),因为在后期阶段,我需要测试由这些组成的列表中是否存在某些值RDD 与否。
所以,目前我的代码如下所示:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
稍后在代码中
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
然后我多次重复这个模式。
有没有办法做手术
if word in myrdd:
do something
不先收集rdd?
有没有类似 rdd.contains() 的函数?
P.S:我没有在内存中缓存任何东西。我的火花上下文如下所示:
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
【问题讨论】:
-
不要使用 .collect() 它会将所有数据带到驱动程序,如果您有更大的数据集,则会产生问题。使用 myrdd2.foreachRDD 并检查值是否存在
-
word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter{lambda x : x in word} 类似这样的东西这也是我认为的一种解决方法
标签: python apache-spark pyspark