【问题标题】:Inserting a spark RDD, which contains n number of scala class objects into cassandra db将包含 n 个 scala 类对象的 spark RDD 插入 cassandra db
【发布时间】:2017-02-09 05:08:54
【问题描述】:

假设我有五个 scala 类的对象,我需要用这五个对象构建一个 spark RDD 并将该 RDD 推送到 cassandra 表中 我的 cassandra 表“人”有三个字段(pId、pName、pAge) 和

val object 1= new myclass(1,"abc",24)
val object2 = new myclass(2,"pqr",23)
val object3 = new myclass(3,"xyz",26)

我如何形成这三个对象的 rdd ? 下面的行可能吗

val collection=context.parallelize(Seq(object1,object2,object3))

如果可以制作 RDD .. 我如何将该 RDD 推送到 cassandra 表以在该表“人”中插入三行

【问题讨论】:

    标签: apache-spark cassandra rdd spark-cassandra-connector objectmapper


    【解决方案1】:

    最简单的做法是创建一个 CaseClass,其中的类与表中的行匹配

    case class PersonRow(pID: int, pName: String, pAge: Int)
    context.parallelize(Seq(
      PersonRow(1, "abc", 24),
      PersonRow(2, "pqr", 23),
      PersonRow(3, "xyz", 26)
    )).saveToCassandra("ks","person")
    

    欲了解更多信息,请参阅Spark Cassandra Connector Documentation

    编辑

    mapToRow 在 Scala 代码中是不必要的,因为它基本上是 Scala 中缺乏隐式的一种解决方法。 SaveToCassandra 通常使用隐含的RowWriterFactory,Scala 可以通过查看 RDD 类类型来为您完成此操作。在 Java 中,必须手动创建工厂。

    scala> class SomeRandomClass (val k:Int, val v:Int) {
         | def fun() = {println("lots of fun")}
         | val somethingElse:Int = 5
         | }
    defined class SomeRandomClass
    
    scala> sc.parallelize(1 to 10).map( x => new SomeRandomClass(x,x)).saveToCassandra("test","test")
    
    scala> sc.cassandraTable("test","test")
    res4: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[7] at RDD at CassandraRDD.scala:15
    
    scala> sc.cassandraTable("test","test").collect
    res5: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 5, v: 5}, CassandraRow{k: 10, v: 10}, CassandraRow{k: 1, v: 1}, CassandraRow{k: 8, v: 8}, CassandraRow{k: 2, v: 2}, CassandraRow{k: 4, v: 4}, CassandraRow{k: 7, v: 7}, CassandraRow{k: 6, v: 6}, CassandraRow{k: 9, v: 9}, CassandraRow{k: 3, v: 3})
    

    请注意,这只是因为可以在类的字段(k 和 v)到表中的列“k 和 v”之间找到映射。

    【讨论】:

    • 是否可以在不使用案例类并将 Rdd 传递给 savetocssandra 的情况下完成,其中 RDD 包含对象集合,如 val empObj :emp = new emp(1,2,"yyyy") val collection = context.parallelize(Seq(empObj)) collection.saveToCassandra("smart", "emp")
    • 您可以使用元组,或类似 java Mbean 的类(参见文档),或 CassandraRow 对象。 github.com/datastax/spark-cassandra-connector/blob/… 无法映射泛型类,因为无法知道类和 CassandraTables 之间的映射应该是什么。
    • spark中是否有等效于“mapToRow” api的scala api和java api,以便我们可以直接将类结构映射到RDD。
    • mapToRow 只是创建 rowwriterfactory 的包装器,它在 scala 代码中隐式完成。基本上这意味着当您执行 rdd.saveToCassandra(ks,table) 时,会有隐式参数(隐式 RowWriterFactory[yourClass])。这意味着没有理由在 Scala 中实际调用 mapToRow。如果可以为您的类创建 rowWriterFactory,编译器会这样做。我将添加一个示例
    • 所以你的意思是说,通过为我的类实现我自己的 rowWriterFactory 方法,我可以在 scala 中自定义 saveToCassandra 函数中的列映射.. 对吗?
    猜你喜欢
    • 2016-01-30
    • 2018-09-14
    • 2021-02-03
    • 2018-03-11
    • 2019-11-12
    • 1970-01-01
    • 2020-06-25
    • 2017-06-26
    • 1970-01-01
    相关资源
    最近更新 更多