Spark 使用延迟评估,允许引擎在非常精细的级别上优化 RDD 转换。
当你执行时
val DF1= spark.sql("select x,y from A,B ")
除了将变换添加到有向无环图之外,什么都不会发生。
只有当你执行一个Action,比如DF1.count,驱动才会强制执行一个物理执行计划。这会尽可能地推迟到 RDD 转换链的下游。
所以问是不对的
1.计算DF1真正需要多少时间
2.计算DF2需要多少时间和
至少基于您提供的代码示例。您的代码没有“计算”val DF1。我们可能不知道处理 DF1 花了多长时间,除非您以某种方式欺骗编译器分别处理每个数据帧。
一个更好的结构问题的方法可能是“我的工作总共分为多少个阶段(任务),完成这些阶段(任务)需要多长时间”?
这可以通过查看日志文件/Web GUI 时间线轻松回答(根据您的设置有不同的风格)
3. 将这些最终联接保留到 Hive / HDFS 需要多少时间
公平的问题。查看神经节
集群范围的监控工具(例如 Ganglia)可以深入了解集群的整体利用率和资源瓶颈。例如,Ganglia 仪表板可以快速显示特定工作负载是否受磁盘限制、网络限制或 CPU 限制。
我喜欢使用它的另一个技巧来定义必须以单独函数内的动作结束的每个转换序列,然后在“计时器函数”块内的输入 RDD 上调用该函数。
例如,我的“计时器”是这样定义的
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1e9 + "s")
result
}
并且可以用作
val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")
scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2
但是,不要仅仅为了将 DAG 分解为更多阶段/范围的依赖关系而调用不必要的操作。这可能会导致洗牌或减慢您的执行速度。
资源:
https://spark.apache.org/docs/latest/monitoring.html
http://ganglia.sourceforge.net/
https://www.youtube.com/watch?v=49Hr5xZyTEA