【发布时间】:2016-11-29 11:56:47
【问题描述】:
所以,我知道一般情况下应该在以下情况下使用coalesce():
由于
filter或其他可能导致减少原始数据集(RDD、DF)的操作,分区数减少。coalesce()对于过滤大型数据集后更有效地运行操作很有用。
我也知道它比repartition 便宜,因为它仅在必要时通过移动数据来减少洗牌。我的问题是如何定义coalesce 采用的参数(idealPartionionNo)。我正在处理一个从另一位工程师传递给我的项目,他正在使用以下计算来计算该参数的值。
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
然后将其与 partitioner 对象一起使用:
val partitioner = new HashPartitioner(idealPartionionNo)
但也用于:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
这是正确的方法吗? idealPartionionNo 值计算背后的主要思想是什么? REPARTITION_FACTOR 是什么?我通常如何定义它?
此外,由于 YARN 负责即时识别可用的执行程序,因此有一种方法可以即时获取该数字 (AVAILABLE_EXECUTOR_INSTANCES) 并将其用于计算 idealPartionionNo(即,将 NO_OF_EXECUTOR_INSTANCES 替换为 @987654336 @)?
理想情况下,一些实际的表单示例:
- 这是一个数据集(大小);
- 这里有一些 RDD/DF 的转换和可能的重用。
- 这里是您应该重新分区/合并的地方。
- 假设您有
nexecutors 和mcores 和一个 partition factor 等于k
然后:
- 理想的分区数是 ==> ???
另外,如果您可以将我推荐给一个很好的博客来解释这些,我将不胜感激。
【问题讨论】:
标签: scala apache-spark rdd