【发布时间】:2017-10-24 04:51:27
【问题描述】:
我有一个问题,如何使用 spark 来操作/迭代/扫描多个 cassandra 表。我们的项目使用spark&spark-cassandra-connector连接cassandra来扫描多个表,尝试匹配不同表中的相关值,如果匹配,则采取额外的操作,例如插入表。用例如下:
sc.cassandraTable(KEYSPACE, "table1").foreach(
row => {
val company_url = row.getString("company_url")
sc.cassandraTable(keyspace, "table2").foreach(
val url = row.getString("url")
val value = row.getString("value")
if (company_url == url) {
sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
}
)
})
问题是
由于 spark RDD 不可序列化,嵌套搜索将失败,导致 sc.cassandraTable 返回 RDD。我知道解决的唯一方法是使用 sc.broadcast(sometable.collect())。但是如果 sometable 很大,collect 会消耗掉所有的内存。而且,如果在用例中,多个表使用广播,它会耗尽内存。
RDD.persist 可以代替广播处理这种情况吗?就我而言,我使用 sc.cassandraTable 来读取 RDD 中的所有表并持久化回磁盘,然后将数据取回以进行处理。如果可行,我如何保证 rdd 读取是由块完成的?
除了spark,有没有其他工具(比如hadoop等??)可以优雅地处理这个案例?
【问题讨论】:
标签: apache-spark cassandra spark-cassandra-connector