【发布时间】:2017-01-27 08:04:53
【问题描述】:
我在本地机器(8 核,16gb 内存)上设置了 Spark 2.0 和 Cassandra 3.0 用于测试目的,并编辑 spark-defaults.conf 如下:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
接下来我在 Cassandra 中导入了 150 万行:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev 是一个包含数值的列表,即[2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在在代码中,为了测试整个事情,我刚刚创建了一个SparkSession,连接到 Cassandra 并进行简单的选择计数:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
此时,Spark 输出count,完成Job 大约需要28 秒,分布在13 个Tasks 中(在Spark UI 中,Tasks 的总 Input 为 331.6MB)
问题:
- 这是预期的性能吗?如果没有,我错过了什么?
- 理论上说,DataFrame 的分区数决定了 Spark 将分配作业的任务数。如果我将
spark.sql.shuffle.partitions设置为 4,为什么要创建 13 个任务? (还要确保在我的 DataFrame 上调用rdd.getNumPartitions()的分区数)
更新
我想测试这个数据的一个常见操作:
- 查询一个大型数据集,例如,从 100,000 ~ N 行按
pid分组 - 选择
ev,list<double> - 对每个成员进行平均,假设现在每个列表的长度相同,即
df.groupBy('pid').agg(avg(df['ev'][1]))
正如 @zero323 建议的那样,我为这次测试部署了一台带有 Cassandra 的外部机器(2Gb RAM、4 核、SSD),并加载了相同的数据集。与我之前的测试相比,df.select().count() 的结果是预期更长的延迟和更差的整体性能(完成Job 大约需要 70 秒)。
编辑:我误解了他的建议。 @zero323 旨在让 Cassandra 执行计数,而不是使用 Spark SQL,如 here
中所述我还想指出,我知道为此类数据设置 list<double> 而不是宽行的固有反模式,但我现在担心的更多是花在检索大型数据集,而不是实际的平均计算时间。
【问题讨论】:
-
如果要执行计数,那么查询外部源会更有效。一般来说,很大程度上取决于你做什么。关于分区
spark.sql.shuffle.partitions此处未使用。初始分区数由数据源设置,计数始终使用 1 个任务进行最终聚合。 -
再次感谢@zero323。请检查我的更新。另外,如果我理解正确,您是说分区数是由 Cassandra 设置的?
-
OK
-
关于分区数,请查看
spark.cassandra.input.split.size_in_mb参数以及它与分区总数的关系。另外,为了简单计数,请查看 Cassandra 支持的 RDD 上的cassandraCount。
标签: apache-spark cassandra pyspark