【问题标题】:Spark: PySpark + Cassandra query performanceSpark:PySpark + Cassandra 查询性能
【发布时间】:2017-01-27 08:04:53
【问题描述】:

我在本地机器(8 核,16gb 内存)上设置了 Spark 2.0 和 Cassandra 3.0 用于测试目的,并编辑 spark-defaults.conf 如下:

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4

接下来我在 Cassandra 中导入了 150 万行:

test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)

test.ev 是一个包含数值的列表,即[2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

现在在代码中,为了测试整个事情,我刚刚创建了一个SparkSession,连接到 Cassandra 并进行简单的选择计数:

cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()

此时,Spark 输出count,完成Job 大约需要28 秒,分布在13 个Tasks 中(在Spark UI 中,Tasks 的总 Input 为 331.6MB)

问题:

  • 这是预期的性能吗?如果没有,我错过了什么?
  • 理论上说,DataFrame 的分区数决定了 Spark 将分配作业的任务数。如果我将 spark.sql.shuffle.partitions 设置为 4,为什么要创建 13 个任务? (还要确保在我的 DataFrame 上调用 rdd.getNumPartitions() 的分区数)

更新

我想测试这个数据的一个常见操作:

  • 查询一个大型数据集,例如,从 100,000 ~ N 行按pid 分组
  • 选择evlist&lt;double&gt;
  • 对每个成员进行平均,假设现在每个列表的长度相同,即df.groupBy('pid').agg(avg(df['ev'][1]))

正如 @zero323 建议的那样,我为这次测试部署了一台带有 Cassandra 的外部机器(2Gb RAM、4 核、SSD),并加载了相同的数据集。与我之前的测试相比,df.select().count() 的结果是预期更长的延迟和更差的整体性能(完成Job 大约需要 70 秒)。

编辑:我误解了他的建议。 @zero323 旨在让 Cassandra 执行计数,而不是使用 Spark SQL,如 here

中所述

我还想指出,我知道为此类数据设置 list&lt;double&gt; 而不是宽行的固有反模式,但我现在担心的更多是花在检索大型数据集,而不是实际的平均计算时间。

【问题讨论】:

  • 如果要执行计数,那么查询外部源会更有效。一般来说,很大程度上取决于你做什么。关于分区spark.sql.shuffle.partitions 此处未使用。初始分区数由数据源设置,计数始终使用 1 个任务进行最终聚合。
  • 再次感谢@zero323。请检查我的更新。另外,如果我理解正确,您是说分区数是由 Cassandra 设置的?
  • OK
  • 关于分区数,请查看spark.cassandra.input.split.size_in_mb 参数以及它与分区总数的关系。另外,为了简单计数,请查看 Cassandra 支持的 RDD 上的 cassandraCount

标签: apache-spark cassandra pyspark


【解决方案1】:

这是预期的表现吗?如果没有,我错过了什么?

它看起来很慢,但并不完全出乎意料。一般count表示为

SELECT 1 FROM table

其次是 Spark 侧求和。因此,虽然它经过优化,但效率仍然很低,因为您从外部源获取 N 个长整数只是为了在本地对它们求和。

正如 the docs 所解释的,Cassandra 支持的 RDD(不是 Datasets)提供了优化的 cassandraCount 方法来执行服务器端计数。

理论上说,DataFrame 的分区数决定了 Spark 将分配作业的任务数。如果我将 spark.sql.shuffle.partitions 设置为 (...),为什么要创建 (...) 任务?

因为这里没有使用spark.sql.shuffle.partitions。此属性用于确定随机分区的数量(当数据由一组键聚合时),而不是用于 Dataset 创建或像 count(*) 这样的全局聚合(始终使用 1 个分区进行最终聚合)。

如果您有兴趣控制初始分区的数量,您应该查看spark.cassandra.input.split.size_in_mb,它定义了:

要提取到 Spark 分区中的大约数据量。生成的 Spark 分区的最小数量为 1 + 2 * SparkContext.defaultParallelism

正如您在此处看到的另一个因素是 spark.default.parallelism,但它并不完全是一个微妙的配置,因此通常依赖它并不是最佳选择。

【讨论】:

  • 这确实非常具有说明性。谢谢。
【解决方案2】:

我看到这是一个非常古老的问题,但也许现在有人需要它。 在本地机器上运行 Spark 时,将 SparkConf 主服务器设置为“本地 [*]”非常重要,根据文档,它允许使用与机器上的逻辑核心一样多的工作线程运行 Spark。

与主“本地”相比,它帮助我将本地计算机上的 count() 操作性能提高了 100%。

【讨论】:

    猜你喜欢
    • 2015-08-18
    • 2018-10-19
    • 2015-11-09
    • 2015-08-25
    • 1970-01-01
    • 2015-08-01
    • 2017-08-13
    • 2017-08-11
    相关资源
    最近更新 更多