【发布时间】:2017-03-13 04:48:01
【问题描述】:
我们在单个 AWS EC2 实例上以本地模式运行 Spark Java,使用
"local[*]"
但是,使用 New Relic 工具和简单的“顶部”进行分析表明,我们的 16 核机器中只有一个 CPU 内核用于我们编写的三个不同的 Java spark 作业(我们还尝试了不同的 AWS 实例但只使用了一个核心)。
Runtime.getRuntime().availableProcessors() 报告 16 个处理器和
sparkContext.defaultParallelism() 也报告了 16 个。
我查看了各种 Stackoverflow 本地模式问题,但似乎都没有解决问题。
非常感谢任何建议。
谢谢
编辑:过程
1) 使用 sqlContext 将使用 com.databricks.spark.csv 的压缩 CSV 文件 1 从磁盘 (S3) 读取到 DataFrame DF1。
2) 使用 sqlContext 将使用 com.databricks.spark.csv 的压缩 CSV 文件 2 从磁盘 (S3) 读取到 DataFrame DF2。
3) 使用 DF1.toJavaRDD().mapToPair(返回元组的新映射函数>) RDD1
4) 使用 DF2.toJavaRDD().mapToPair(返回元组的新映射函数>) RDD2
5) 在 RDD 上调用 union
6) 在联合的 RDD 上调用 reduceByKey() 以“按键合并”因此有一个元组>) 只有一个特定键的实例(因为相同的键出现在 RDD1 和 RDD2 中)。
7) 调用 .values().map(new mapping 函数,它遍历提供的 List 中的所有项目并根据需要合并它们以返回相同或更小的长度的 List
8) 调用 .flatMap() 获取 RDD
9) 使用 sqlContext 从 DomainClass 类型的平面地图创建 DataFrame
10) 使用 DF.coalease(1).write() 将 DF 作为 gzipped CSV 写入 S3。
【问题讨论】:
-
你的数据集中有多少个分区?如果您只有一个,您将无法从并行性中受益。
-
嗨蒂姆。谢谢回复。我正在使用 Spark CSV 将两个大型 CSV 文件读入 Dataframe,然后将它们转换为 JavaRDD,并对这些 RDD 执行许多手动转换和合并操作,从而生成一个 Dataframe。我再次使用 SparkCSV 写入磁盘。我不确定在这个过程中我可以在哪里手动指定分区?谢谢
-
您会自动获取分区,因此每 32M 应该有 1 个(在 NFS 文件系统上)或每 128M 应该有 1 个(在 HDFS 上)。听起来你肯定有更多的东西。你看过 Spark 进度条吗?您的操作可能不受 CPU 限制,而是 IO 或网络限制。
-
如果您可以将整个 DAG 发布到问题中,可能会更容易调试。
-
spark 进度条大部分时间都在
[Stage 4:=============================> (1 + 1) / 2]的 50% 处花费,进度条在多个小时内都没有移动。
标签: java amazon-web-services apache-spark amazon-ec2