【问题标题】:Converting CassandraRow obtained from joinWithCassandraTable to DataFrame将joinWithCassandraTable得到的CassandraRow转换为DataFrame
【发布时间】:2018-04-12 15:11:25
【问题描述】:
case class SourcePartition(id: String, host:String ,bucket: Int)
joinedRDDs =partitions.joinWithCassandraTable("db_name","table_name")
joinedRDDs.values.foreach(println)

我必须使用 joinWithCassandraTable ,如何将结果 CassandraRow 转换为 DataFrame?或者是否有任何等效的 joinWithCassandraTable 和 DataFrame ?

我必须一口气读取很多分区,我知道 Datastax Cassandra 连接器谓词下推,但它一次只允许拉一个分区(它似乎不允许 IN 运算符,只有 = 似乎支持)

【问题讨论】:

    标签: apache-spark cassandra spark-cassandra-connector


    【解决方案1】:
    val spark: SparkSession = SparkSession.builder().master("local[4]").appName("RDD2DF").getOrCreate()
        val sc: SparkContext = spark.sparkContext
    
        import spark.implicits._
    
        val internalJoinRDD = spark.sparkContext.cassandraTable("test", "test_table_1").joinWithCassandraTable("test", "table_table_2")
        internalJoin.toDebugString
    
        internalJoinRDD.toDF()
    

    你能不能试试上面的代码sn-p。

    如果您有数据架构,则可以使用

    def createDataFrame(internalJoinRDD: RDD[Row], schema: StructType): DataFrame
    

    【讨论】:

      猜你喜欢
      • 2017-10-30
      • 2016-05-09
      • 2018-12-26
      • 2017-10-01
      • 2017-03-17
      • 2017-03-23
      • 2019-01-21
      • 2018-08-29
      • 2020-01-24
      相关资源
      最近更新 更多