【发布时间】:2019-12-20 06:27:00
【问题描述】:
我正在调查一个有趣的案例,该案例涉及对慢速 RDD 或数据集进行广泛转换(例如重新分区和连接),例如以下代码定义的数据集:
val ds = sqlContext.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - ${ii}")
ii
}
}
慢速数据集是相关的,因为它类似于远程数据源的视图,并且分区迭代器源自单线程网络协议(http、jdbc 等),在这种情况下,下载速度 >单线程处理的速度,但是
不幸的是,传统的 Spark 计算模型在慢速数据集上效率不高,因为我们受限于以下选项之一:
仅使用窄转换 (flatMap-ish) 在单个线程中对流进行端到端的数据处理,显然数据处理将成为瓶颈,并且资源利用率会很低。
使用宽操作(包括重新分区)来平衡 RDD/数据集,虽然这对于并行数据处理效率至关重要,但 Spark 粗粒度调度程序要求完全完成下载,这成为另一个瓶颈.
实验
下面的程序是对这种情况的简单模拟:
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
println(f"repartitioned - ${ii}")
ii
}
mapped2.foreach { _ =>
}
在执行上述程序时,可以观察到在RDD依赖中,println(f"repartitioned - ${ii}")行将不会在println(f"skewed - ${ii}")行之前执行。
我想指示 Spark 调度程序在其任务完成之前开始分发/传送分区迭代器生成的数据条目(通过微批处理或流等机制)。有没有一种简单的方法可以做到这一点?例如。将慢速数据集转换为结构化流会很好,但应该有更好集成的替代方案。
非常感谢您的意见
更新:为了让您的实验更容易,我附加了可以开箱即用的 scala 测试:
package com.tribbloids.spookystuff.spike
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}
@Ignore
class SlowRDDSpike extends FunSpec {
lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
lazy val sc: SparkContext = spark.sparkContext
lazy val sqlContext: SQLContext = spark.sqlContext
import sqlContext.implicits._
describe("is repartitioning non-blocking?") {
it("dataset") {
val ds = sqlContext
.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
it("RDD") {
val ds = sc
.parallelize(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
【问题讨论】:
-
我很犹豫是否建议它,因为它构建得非常糟糕,但是您是否尝试过使用
Spark Streaming?如果是这样,什么不起作用? -
我标题中的'stream'指的是Spark Streaming的2个实现,我没有找到解决方案
-
“Spark 粗粒度调度程序要求下载完全完成” - 为什么会出现这个问题?你能指定数据源吗?
-
是的,它很慢,需要几分钟
标签: scala apache-spark apache-spark-sql spark-streaming