【发布时间】:2020-09-04 07:00:41
【问题描述】:
使用 Spark 2.4.0。 我的生产数据非常倾斜,因此其中一项任务花费的时间是其他任务的 7 倍。 我尝试了不同的策略来规范化数据,以便所有执行者都能平等地工作 -
- spark.default.parallelism
- reduceByKey(numPartitions)
- 重新分区(numPartitions)
我的期望是它们三个应该均匀分区,但是在 Spark Local/Standalone 上使用一些虚拟的非生产数据表明选项 1,2 比 3 更规范化。
数据如下:(我正在尝试对每个帐户的余额+ccy组合进行简单的减少
account}date}ccy}amount
A1}2020/01/20}USD}100.12
A2}2010/01/20}SGD}200.24
A2}2010/01/20}USD}300.36
A1}2020/01/20}USD}400.12
预期结果应该是[A1-USD,500.24], [A2-SGD,200.24], [A2-USD,300.36] 理想情况下,这些应该被划分为 3 个不同的分区。
javaRDDWithoutHeader
.mapToPair((PairFunction<Balance, String, Integer>) balance -> new Tuple2<>(balance.getAccount() + balance.getCcy(), 1))
.mapToPair(new MyPairFunction())
.reduceByKey(new ReductionFunction())
检查分区的代码
System.out.println("b4 = " +pairRDD.getNumPartitions());
System.out.println(pairRDD.glom().collect());
JavaPairRDD<DummyString, BigDecimal> newPairRDD = pairRDD.repartition(3);
System.out.println("Number of partitions = " +newPairRDD.getNumPartitions());
System.out.println(newPairRDD.glom().collect());
- 选项 1:什么都不做
- 选项 2:将 spark.default.parallelism 设置为 3
- 选项 3:reduceByKey 与 numPartitions = 3
-
选项4:重新分区(3)
对于选项 1 分区数 = 2 [ [(DummyString{account='A2', ccy='SGD'},200.24), (DummyString{ account='A2', ccy='USD'},300.36)], [(DummyString{account='A1', ccy='USD'},500.24)] ]
对于选项 2
分区数 = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)]]
对于选项 3 分区数 = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)] ]
对于选项 4 分区数 = 3 [[], [(虚拟字符串{ account='A2', ccy='SGD'},200.24)], [(DummyString{ account='A2', ccy='USD'},300.36), (DummyString{ account='A1', ccy='USD'},500.24)]]
结论: 选项 2(spark.default.parallelism) 和 3(reduceByKey(numPartitions) 归一化比选项 4(重新分区)好得多 相当确定的结果,从未见过 option4 归一化为 3 个分区。
问题:
- reduceByKey(numPartitions) 比 repartition 好很多
- 这仅仅是因为样本数据集太小了吗?或
- 当我们通过 YARN 集群提交时,这种行为是否会有所不同
【问题讨论】:
-
阶段或任务——我假设是后者。
-
1 个任务(在一个阶段的 250 多个任务中)花费了很长时间
-
你能展示一下 Spark UI 的东西吗?我怀疑250在输入端。什么是绝对值的 7 倍?
-
哪个阶段?为什么你认为散列应该都在单独的桶中?哈希结果就是它们的样子。
标签: performance apache-spark concurrency reducers