虽然 KrisP 提供的答案突出了所有重要差异,但我认为值得注意的是,mapPartitions 只是高级转换背后的低级构建块,而不是实现共享状态的方法。
虽然mapPartitions 可用于明确共享喜欢的状态,但在技术上它不共享(其生命周期仅限于mapPartitionsclosure`),还有其他方法可以实现它。特别是,在闭包内引用的变量在分区内共享。为了说明这一点,让我们玩一下单例:
object DummySharedState {
var i = 0L
def get(x: Any) = {
i += 1L
i
}
}
sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
// res3: Long = 100
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
// res4: Long = 50
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
// res5: Long = 2
和 PySpark 中类似的东西:
-
单例模块dummy_shared_state.py:
i = 0
def get(x):
global i
i += 1
return i
-
主脚本:
from pyspark import SparkConf, SparkContext
import dummy_shared_state
master = "spark://..."
conf = (SparkConf()
.setMaster(master)
.set("spark.python.worker.reuse", "false"))
sc.addPyFile("dummy_shared_state.py")
sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
请注意,spark.python.worker.reuse 选项设置为 false。如果您保留默认值,您实际上会看到如下内容:
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 150
归根结底,您必须区分三种不同的事物:
- 广播变量,旨在减少网络流量和内存占用,方法是将变量的副本保留在工作线程上,而不是随每个任务一起提供
- 在闭包外部定义的变量和在闭包内部引用的变量必须随每个任务一起提供并为此任务共享
- 在闭包内定义的变量不共享
除此之外,还有一些与持久解释器的使用相关的 Python 特定问题。
在变量生命周期方面,map(filter 或其他转换)和mapPartitions 之间仍然没有实际区别。