【发布时间】:2016-01-22 17:10:22
【问题描述】:
Spark 用于计算 reduce 任务数的公式是什么?
我正在运行几个 spark-sql 查询,reduce 任务的数量始终为 200。这些查询的映射任务数量为 154。我使用的是 Spark 1.4.1。
这是否与spark.shuffle.sort.bypassMergeThreshold有关,默认为200
【问题讨论】:
标签: apache-spark apache-spark-sql
Spark 用于计算 reduce 任务数的公式是什么?
我正在运行几个 spark-sql 查询,reduce 任务的数量始终为 200。这些查询的映射任务数量为 154。我使用的是 Spark 1.4.1。
这是否与spark.shuffle.sort.bypassMergeThreshold有关,默认为200
【问题讨论】:
标签: apache-spark apache-spark-sql
你要的是spark.sql.shuffle.partitions。根据Spark SQL performance tuning guide:
| Property Name | Default | Meaning |
+-------------------------------+---------+------------------------------------------------+
| spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use |
| | | when shuffling data for joins or aggregations. |
另一个相关的选项是spark.default.parallelism,它决定了“用户未设置时由连接、reduceByKey 和并行化等转换返回的 RDD 中的默认分区数”,但 Spark SQL 和仅在处理普通 RDD 时相关。
【讨论】:
是的,@svgd,这是正确的参数。以下是在 Scala 中重置它的方法:
// Set number of shuffle partitions to 3
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
// Verify the setting
sqlContext.getConf("spark.sql.shuffle.partitions")
【讨论】:
现在在 Spark 2 + 中设置此参数执行以下操作
spark.conf.set("spark.sql.shuffle.partitions", 16)
【讨论】:
通过mapreduce.input.fileinputformat.split 指定最小和最大拆分大小应该会有所帮助。这些参数分别决定了将输入文件分割成的最小和最大块大小。
val spark = SparkSession.builder
.config("mapreduce.input.fileinputformat.split.minsize", "1073741824")
.config("mapreduce.input.fileinputformat.split.maxsize", "1073741824")
.enableHiveSupport().getOrCreate()
这里,分割大小一直保持为 1GB(1073741824 字节)。 要记住 parquet、snappy 是可拆分的,而 gzip、lzo 不是。更多请参考here。
【讨论】: