【问题标题】:How to convert a Cassandra ResultSet to a Spark DataFrame?如何将 Cassandra ResultSet 转换为 Spark DataFrame?
【发布时间】:2023-03-28 12:30:02
【问题描述】:

我通常会使用 Java 以这种方式将数据从 Cassandra 加载到 Apache Spark:

SparkContext sparkContext = StorakleSparkConfig.getSparkContext();

CassandraSQLContext sqlContext = new CassandraSQLContext(sparkContext);
    sqlContext.setKeyspace("midatabase");

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " +
            "WHERE CAST(store_id as string) = '" + storeId + "'");

但是假设我有一个分片,我需要将几个分区键加载到这个 DataFrame 中。我可以在查询中使用 WHERE IN (...) 并再次使用 cassandraSql 方法。但是我有点不愿意使用 WHERE IN,因为在协调节点方面存在单点故障这一臭名昭著的问题。此处对此进行了解释:

https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/

有没有办法使用多个查询但将它们加载到单个 DataFrame 中?

【问题讨论】:

    标签: apache-spark cassandra-2.0 datastax spark-cassandra-connector


    【解决方案1】:

    执行此操作的一种方法是运行单个查询和 unionAll/union 多个 DataFrame/RDD。

    SparkContext sparkContext = StorakleSparkConfig.getSparkContext();
    
    CassandraSQLContext sqlContext = new CassandraSQLContext(sparkContext);
        sqlContext.setKeyspace("midatabase");
    
    DataFrame customersOne = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " + "WHERE CAST(store_id as string) = '" + storeId1 + "'");
    
    DataFrame customersTwo = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " + "WHERE CAST(store_id as string) = '" + storeId2 + "'");
    
    DataFrame allCustomers = customersOne.unionAll(CustomersTwo)
    

    【讨论】:

    • 感谢您的回答!是的,我想到了这一点,但不确定 Spark 方面的性能影响。你觉得有吗?
    • @MilenKovachev Union 非常高效,因为它不需要任何洗牌。但是,请注意,它可能会删除您的分区。见这里:stackoverflow.com/questions/29977526/…
    • 假设我需要检索可变数量的键,我将不得不在 for 循环中运行查询。有没有办法并行运行各个 sqlContext.cassandraSql 语句?
    猜你喜欢
    • 2017-03-25
    • 1970-01-01
    • 2017-12-31
    • 2017-07-14
    • 2017-03-17
    • 2021-12-23
    • 2016-05-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多