【问题标题】:use spark to scan multiple cassandra tables using spark-cassandra-connector使用 spark-cassandra-connector 使用 spark 扫描多个 cassandra 表
【发布时间】:2017-10-24 04:51:27
【问题描述】:

我有一个问题,如何使用 spark 来操作/迭代/扫描多个 cassandra 表。我们的项目使用spark&spark-cassandra-connector连接cassandra来扫描多个表,尝试匹配不同表中的相关值,如果匹配,则采取额外的操作,例如插入表。用例如下:

sc.cassandraTable(KEYSPACE, "table1").foreach(
  row => {
     val company_url = row.getString("company_url")

     sc.cassandraTable(keyspace, "table2").foreach(
         val url = row.getString("url")
         val value = row.getString("value")
         if (company_url == url) {
            sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
         }
     )
})

问题是

  1. 由于 spark RDD 不可序列化,嵌套搜索将失败,导致 sc.cassandraTable 返回 RDD。我知道解决的唯一方法是使用 sc.broadcast(sometable.collect())。但是如果 sometable 很大,collect 会消耗掉所有的内存。而且,如果在用例中,多个表使用广播,它会耗尽内存。

  2. RDD.persist 可以代替广播处理这种情况吗?就我而言,我使用 sc.cassandraTable 来读取 RDD 中的所有表并持久化回磁盘,然后将数据取回以进行处理。如果可行,我如何保证 rdd 读取是由块完成的?

  3. 除了spark,有没有其他工具(比如hadoop等??)可以优雅地处理这个案例?

【问题讨论】:

    标签: apache-spark cassandra spark-cassandra-connector


    【解决方案1】:

    看起来您实际上是在尝试执行一系列内连接。见

    joinWithCassandraTable方法

    这允许您使用 One RDD 的元素对 Cassandra 表进行直接查询。根据您从 Cassandra 读取的数据比例,这可能是您最好的选择。如果分数太大,最好分别读取两个表,然后使用 RDD.join 方法排列行。

    如果一切都失败了,您始终可以手动使用 CassandraConnector 对象直接访问 Java 驱动程序并使用来自分布式上下文的原始请求。

    【讨论】:

    • 我无法进行连接,因为在大多数情况下,我必须使用 string.contains 来比较相关列,而不是仅使用字符串相等运算符。
    • 这需要笛卡尔连接,除非你有像 Solr 这样的二级索引。
    • 谢谢。如果我进行笛卡尔连接,结果会不会很大,可能会耗尽内存?以及如何使用二级索引来做这些事情?
    猜你喜欢
    • 2017-08-06
    • 2016-09-02
    • 1970-01-01
    • 2020-10-02
    • 2019-04-10
    • 1970-01-01
    • 2019-10-06
    • 1970-01-01
    • 2019-10-15
    相关资源
    最近更新 更多