【问题标题】:How to iterate over large Cassandra table in small chunks in Spark如何在 Spark 中以小块迭代大型 Cassandra 表
【发布时间】:2015-04-27 20:40:32
【问题描述】:

在我的测试环境中,我有 1 个 Cassandra 节点和 3 个 Spark 节点。我想迭代显然有大约 200k 行的大表,每行大约需要 20-50KB。

CREATE TABLE foo (
  uid timeuuid,
  events blob,
  PRIMARY KEY ((uid))
) 

这是在 spark 集群上执行的 scala 代码

val rdd = sc.cassandraTable("test", "foo")

// This pulls records in memory, taking ~6.3GB
var count = rdd.select("events").count()  

// Fails nearly immediately with 
// NoHostAvailableException: All host(s) tried for query failed [...]
var events = rdd.select("events").collect()

Cassandra 2.0.9,Spark:1.2.1,Spark-cassandra-connector-1.2.0-alpha2

我尝试只运行collect,而不运行count - 在这种情况下,它只是使用NoHostAvailableException 快速失败。

问题:一次迭代大表读取和处理小批量行的正确方法是什么?

【问题讨论】:

  • 您可以尝试在计数之前增加分区数,使用 repartition

标签: scala cassandra apache-spark rdd


【解决方案1】:

Cassandra Spark 连接器中有 2 个设置来调整块大小(将它们放在 SparkConf 对象中):

  • spark.cassandra.input.split.size:每个 Spark 分区的行数(默认 100000)
  • spark.cassandra.input.page.row.size:每个抓取页面的行数(即网络往返)(默认1000)

此外,您不应在示例中使用collect 操作,因为它会获取驱动程序应用程序内存中的所有行,并可能引发内存不足异常。只有当您确定它会产生少量行时,您才能使用 collect 操作。 count 操作不同,它只产生一个整数。因此,我建议您像以前一样从 Cassandra 加载数据,对其进行处理并存储结果(在 Cassandra、HDFS 等中)。

【讨论】:

    猜你喜欢
    • 2015-09-30
    • 1970-01-01
    • 2022-08-11
    • 1970-01-01
    • 2012-01-07
    • 2015-04-29
    • 1970-01-01
    • 2013-12-19
    • 2020-08-03
    相关资源
    最近更新 更多