【问题标题】:Check if value exists in a RDD检查 RDD 中是否存在值
【发布时间】:2016-11-25 13:46:49
【问题描述】:

我已经用 python 编写了一个运行正常的 Spark 程序。

但是,它在内存消耗方面效率低下,我正在尝试对其进行优化。我在 AWS EMR 上运行它,而 EMR 因消耗过多内存而终止了这项工作。

 Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

我相信这个内存问题是由于我在几个实例中收集了我的 RDD(即使用 .collect() ),因为在后期阶段,我需要测试由这些组成的列表中是否存在某些值RDD 与否。

所以,目前我的代码如下所示:

myrdd = data.map(lambda word: (word,1))     \
       .reduceByKey(lambda a,b: a+b)   \
       .filter(lambda (a, b): b >= 5) \
       .map(lambda (a,b) : a)          \
       .collect()

稍后在代码中

if word in myrdd:
    mylist.append(word)

myrdd2 = data2.map(lambda word: (word,1))     \
       .reduceByKey(lambda a,b: a+b)   \
       .filter(lambda (a, b): b >= 5) \
       .map(lambda (a,b) : a)          \
       .collect()

if word in myrdd2:
    mylist2.append(word)

然后我多次重复这个模式。

有没有办法做手术

if word in myrdd: 
    do something

不先收集rdd?

有没有类似 rdd.contains() 的函数?

P.S:我没有在内存中缓存任何东西。我的火花上下文如下所示:

jobName = "wordcount"
sc = SparkContext(appName = jobName)

......
......

sc.stop()

【问题讨论】:

  • 不要使用 .collect() 它会将所有数据带到驱动程序,如果您有更大的数据集,则会产生问题。使用 myrdd2.foreachRDD 并检查值是否存在
  • word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter{lambda x : x in word} 类似这样的东西这也是我认为的一种解决方法

标签: python apache-spark pyspark


【解决方案1】:

来自 YARN 的错误消息说 collect 不是问题,因为您的执行程序(而不是驱动程序)有内存问题。

首先,尝试遵循错误消息建议并提升spark.yarn.executor.memoryOverhead - 在 YARN 上运行 pyspark 时,您可以告诉 YARN 为 python 工作进程分配更大的容器内存。

接下来,看看执行器需要大量内存的操作。您使用reduceByKey,也许您可​​以增加分区数量以使它们在使用的内存方面更小。看numPartitions参数:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

最后,如果要检查 rdd 是否包含某个值,则只需按此值过滤并使用countfirst 进行检查,例如:

looking_for = "....."
contains = rdd.filter(lambda a: a == looking_for).count() > 0

【讨论】:

  • 谢谢。有很多 RDD 会给执行者带来压力吗?例如。如果我执行 myrddalias = myrdd 之类的操作,是否会给记忆带来额外压力?
  • 只是复制引用,不会克隆rdds本身
  • 问题是 looking_for 是一个 RDD,当我对另一个 RDD 进行过滤时,它会向我显示一个错误,说我不能将一个转换放入另一个转换中。 Find_for 是一个列表,我想根据 looking_for rdd 中是否存在某个值来修剪我的 rdd。确切的错误 - 例外:您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。 RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;
猜你喜欢
  • 2015-08-21
  • 2016-06-17
  • 2011-05-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-03-10
  • 1970-01-01
相关资源
最近更新 更多