【问题标题】:Speed of counting RDD[Array[String]] vs RDD[String]计数 RDD[Array[String]] 与 RDD[String] 的速度
【发布时间】:2017-03-12 21:00:17
【问题描述】:

我正在尝试使用 count() 方法计算以下 RDD 元素。第一种如下:

scala> val data_wo_header=dropheader(data)
data_wo_header: org.apache.spark.rdd.RDD[String]

当我依靠这个时,我得到:

scala> data_wo_header.count()
res1: Long = 20000263 

这个操作比较快,大约需要26秒

现在我将 RDD 转换如下:

scala> val ratings_split = data_wo_header.map(line => line.split(",")).persist()
ratings_split: org.apache.spark.rdd.RDD[Array[String]]

scala> ratings_split.count()
res2: Long = 20000263  

此计数大约需要 5 分钟。有人可以提出为什么阅读计数的时间如此显着增加吗? drop header 函数看起来像这样只是为了删除第一行:

  def dropheader(data: RDD[String]): RDD[String] = {
   data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
     lines.drop(1)
    }
   lines
   })
  }

data 只是val data = sc.textFile(file, 2).cache()

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    第二个显然更长,因为您不仅要计算行数,还要将每一行转换为字符串数组。

    使用不带选项的 persist() 意味着它使用 MEMORY_ONLY,因此与使用 cache() 完全相同。

    现在 5 分钟似乎很昂贵,但这取决于您的配置(总内存、CPU)以及每行的元素数量。

    正如 Chobeat 所说,您需要使用 Spark UI 进行调查。

    【讨论】:

      【解决方案2】:

      好吧,您可以通过查看 Spark UI 并查看需要更多时间的阶段来更轻松地验证这一点。数据上的地图可能需要一些时间来检查整个数据集,这可以解释速度变慢。 persist() 也可能会引入一些开销,但我不确定。

      如果可以,我的建议是使用 CSV 数据源读取该 CSV。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2015-12-11
        • 2020-08-17
        • 1970-01-01
        • 1970-01-01
        • 2021-10-16
        • 2017-02-17
        • 1970-01-01
        相关资源
        最近更新 更多