【发布时间】:2018-03-14 15:16:14
【问题描述】:
我正在运行一个 spark 应用程序,它从几个配置单元表(IP 地址)中读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较。最终结果会是这样的:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
为了进行比较,我将由 hiveContext.sql("query") 语句生成的 dataframes 转换为 Fastutil 对象。像这样:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
然后,我使用iterator 遍历每个集合并使用FileWriter 将行写入文件。
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
我正在使用--num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g 运行应用程序
该过程总共运行大约 18-19 小时,每天大约有 4-5 百万条记录进行一对一比较。
但是,当我检查 Application Master UI 时,我注意到在完成 dataframes 到 fastutil collection objects 的初始转换后没有任何活动发生(启动作业后只需几分钟)。我看到代码中使用的count 和collect 语句产生了新的工作,直到转换完成。之后,运行比较时不会启动新作业。
这意味着什么?这是否意味着分布式处理是 根本没有发生?
我知道集合对象不被视为 RDD,可以
这是什么原因?spark 如何在不使用资源的情况下执行我的程序 分配?
任何帮助将不胜感激,谢谢!
【问题讨论】:
-
我认为,一旦您在代码的
//logic部分提及您在做什么,您会得到更具体的答案。由于您提到数据集连接和查询输出会在几分钟内完成,因此很明显,您工作的其余时间(未分配)正在仅在驱动程序节点中执行的此逻辑部分中花费时间。如果您提到的只是write the rows to a file using FileWriter,那么您应该考虑将此写入分布式文件,例如在 hdfs 上并使用df.write -
我很好奇:这些表只是 IP 地址集,每个表中大约有 4 到 500 万条记录,还是......?
-
不,表格还有其他列,但我只选择 IP 地址。来自 6 个数据集的记录总数接近 500 万条。 @AKX
-
公平。我尝试了一个 SQLite 数据库,其中有 5 个表,每个表只有 200 万个 IP 地址;查询其他 4 个表中的哪一个具有第一个在 45 秒 内完成的 IP 地址。
标签: scala apache-spark fastutil