【问题标题】:How to compare two datasets?如何比较两个数据集?
【发布时间】:2018-03-14 15:16:14
【问题描述】:

我正在运行一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较。最终结果会是这样的:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

为了进行比较,我将由 hiveContext.sql("query") 语句生成的 dataframes 转换为 Fastutil 对象。像这样:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

然后,我使用iterator 遍历每个集合并使用FileWriter 将行写入文件。

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

我正在使用--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g 运行应用程序

该过程总共运行大约 18-19 小时,每天大约有 4-5 百万条记录进行一对一比较。

但是,当我检查 Application Master UI 时,我注意到在完成 dataframesfastutil collection objects 的初始转换后没有任何活动发生(启动作业后只需几分钟)。我看到代码中使用的countcollect 语句产生了新的工作,直到转换完成。之后,运行比较时不会启动新作业。

  • 这意味着什么?这是否意味着分布式处理是 根本没有发生?

  • 我知道集合对象不被视为 RDD,可以
    这是什么原因?

  • spark 如何在不使用资源的情况下执行我的程序 分配?

任何帮助将不胜感激,谢谢!

【问题讨论】:

  • 我认为,一旦您在代码的//logic 部分提及您在做什么,您会得到更具体的答案。由于您提到数据集连接和查询输出会在几分钟内完成,因此很明显,您工作的其余时间(未分配)正在仅在驱动程序节点中执行的此逻辑部分中花费时间。如果您提到的只是write the rows to a file using FileWriter,那么您应该考虑将此写入分布式文件,例如在 hdfs 上并使用 df.write
  • 我很好奇:这些表只是 IP 地址集,每个表中大约有 4 到 500 万条记录,还是......?
  • 不,表格还有其他列,但我只选择 IP 地址。来自 6 个数据集的记录总数接近 500 万条。 @AKX
  • 公平。我尝试了一个 SQLite 数据库,其中有 5 个表,每个表只有 200 万个 IP 地址;查询其他 4 个表中的哪一个具有第一个在 45 秒 内完成的 IP 地址。

标签: scala apache-spark fastutil


【解决方案1】:

行后:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

特别是。上一行的那部分:

df.map(r => r(0).toString).collect()

其中collect 是需要注意的主要事项,从未在dfBuffer(这是一种常规的本地 JVM 数据结构)上执行任何 Spark 作业。

这是否意味着分布式处理根本没有发生?

正确。 collect 将所有数据带到驱动程序运行的单个 JVM 上(这正是你不应该这样做的原因,除非......你知道自己在做什么以及它可能导致什么问题)。

我认为以上回答了所有其他问题。


比较两个数据集(以 Spark 和分布式方式)的问题的一个可能解决方案是 join 一个数据集与参考数据集和 count 比较记录数是否没有变化。

【讨论】:

  • 我会尝试加入。还有一个问题,是否可以在不执行“收集”的情况下将数据帧转换为集合?如果我们执行“sc.parallelize(collection)”,集合是否会被视为 RDD?
  • 我认为你所指的集合是一个常规的 Scala 集合,它是本地的、非分布式的,并且存在于与 Spark 完全相反的单个 JVM 上。这就是您想要collecttake 或类似名称的原因。当您执行 sc.parallelize 时,您最终会得到一个 RDD。
猜你喜欢
  • 1970-01-01
  • 2011-07-30
  • 2015-07-10
  • 2014-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-09-19
相关资源
最近更新 更多