【问题标题】:Scala's collect inefficient in Spark?Scala 在 Spark 中的收集效率低下?
【发布时间】:2016-03-23 01:24:42
【问题描述】:

我目前开始学习在 Scala 中使用 spark。我正在处理的问题需要我读取一个文件,将每一行拆分为某个字符,然后过滤其中一个列与谓词匹配的行,最后删除一列。所以基本的、幼稚的实现是一个映射,然后是一个过滤器,然后是另一个映射。

这意味着要通过 3 次收集,这对我来说似乎很不合理。所以我尝试用一​​个集合(以部分函数作为参数的集合)替换它们。令我惊讶的是,这使它运行得更慢了。我在本地尝试了常规的 Scala 集合;正如预期的那样,后一种方式要快得多。

那为什么?我的想法是map和filter和map不是顺序应用的,而是混合成一个操作;换句话说,当一个动作强制评估时,列表中的每个元素都将被检查,待处理的操作将被执行。那正确吗 ?但即便如此,为什么collect表现如此糟糕?

编辑:显示我想要做什么的代码示例:

天真的方式:

sc.textFile(...).map(l => {
  val s = l.split(" ") 
  (s(0), s(1))
}).filter(_._2.contains("hello")).map(_._1)

收集方式:

sc.textFile(...).collect {
  case s if(s.split(" ")(0).contains("hello")) => s(0)
}

【问题讨论】:

  • 我试图了解您在“非天真”方式中究竟保存了什么。您认为第二种方法会较少使用哪种资源?您正在收集 RDD,这会强制将整个数据移动到一台机器上,这通常会带来性能损失或 OOM 异常。
  • 好吧,我想我不会。 Collect 以偏函数作为参数,“通过应用 f 返回一个包含所有匹配值的 RDD”。因此,据我了解,它的行为应该像 map ,只是它是一个偏函数。因此,即使它不快,也不应该慢很多。
  • 与问题无关,您的示例做不同的事情。 “朴素方式”返回拆分字符串的前部分,而“收集方式”返回输入字符串的第一个字符。

标签: scala apache-spark


【解决方案1】:

答案在于collect的实现:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}

如您所见,它与filter->map 的序列相同,但在您的情况下效率较低。

在 Scala 中,isDefinedAtapplyPartialFunction 方法都评估 if 部分。

因此,在您的“收集”示例中,split 将为每个输入元素执行两次。

【讨论】:

  • 我在这里,认为收集与地图和过滤器不同。非常感谢您的回答好先生。
猜你喜欢
  • 2015-10-06
  • 1970-01-01
  • 1970-01-01
  • 2012-07-28
  • 1970-01-01
  • 2016-12-04
  • 1970-01-01
  • 1970-01-01
  • 2011-03-08
相关资源
最近更新 更多