【问题标题】:Scala - Unable to write Scala object into CassandraScala - 无法将 Scala 对象写入 Cassandra
【发布时间】:2015-01-20 17:29:48
【问题描述】:

我正在尝试使用 Spark 将 Scala 案例类对象写入 Cassandra。但是我在运行代码时遇到了异常。我想我无法将我的案例类对象映射到我的 Cassandra 行。我的 Scala 代码如下所示

CassandraPerformerClass.scala

object CassandraPerformerClass extends App
{
override def main(args: Array[String]) 
{
 val keyspace = "scalakeys1"
 val tablename = "demotable1"
 val conf = new SparkConf().setAppName("CassandraDemo") .setMaster("spark://ct-0015:7077") .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
 conf.set("spark.cassandra.connection.host", "192.168.50.103")
 conf.set("spark.cassandra.connection.native.port", "9041")
 conf.set("spark.cassandra.connection.rpc.port", "9160")
 val sc = new SparkContext(conf);
 CassandraConnector(conf).withSessionDo 
 { session =>
        session.execute("DROP KEYSPACE IF EXISTS "+keyspace+" ;");
        session.execute("CREATE KEYSPACE "+ keyspace +" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
        session.execute("CREATE TABLE "+keyspace+"."+tablename+" (keyval bigint, rangef bigint, arrayval text, PRIMARY KEY (rangef, keyval));");
        session.execute("CREATE INDEX index_11 ON "+keyspace+"."+tablename+" (keyval) ;");
  }

 val data = Seq(new Data(1, 10, "string1"), new Data(2, 20, "string2"));
 val collection = sc.parallelize(data)    
 collection.saveToCassandra(keyspace, tablename)
}

case class Data(kv : Long, rf : Long, av : String) extends Serializable
 {
   private var keyval : Long = kv
   private var rangef : Long = rf
   private var arrayval : String = av

   def setKeyval (kv : Long)
   {
     keyval = kv
   }
   def setRangef (rf : Long)
   {
     rangef = rf
   }
   def setArrayval (av : String)
   {
     arrayval = av
   }
   def getKeyval = keyval
   def getRangef = rangef
   def getArrayval = arrayval
   override def toString = keyval + "," + rangef + "," + arrayval
 }
}

例外

线程“main”java.lang.IllegalArgumentException 中的异常:RDD 中缺少某些主键列或未被选择:rangef、keyval 在 com.datastax.spark.connector.writer.DefaultRowWriter.checkMissingPrimaryKeyColumns(DefaultRowWriter.scala:44) 在 com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:71) 在 com.datastax.spark.connector.writer.DefaultRowWriter$$anon$2.rowWriter(DefaultRowWriter.scala:109) 在 com.datastax.spark.connector.writer.DefaultRowWriter$$anon$2.rowWriter(DefaultRowWriter.scala:107) 在 com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:170) 在 com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:23) 在 com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass$.main(CassandraPerformerClass.scala:33) 在 com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass.main(CassandraPerformerClass.scala)

请告诉我如何将我的案例类对象映射到 Cassandra 行。

【问题讨论】:

    标签: scala cassandra apache-spark


    【解决方案1】:

    用于 Spark 的基于 Scala 的连接器不期望具有字段获取器的 java-bean 类案例类。 (无论如何,这是一个不好的做法 - 案例类是类 bean 数据容器的不可变替代品,并且具有字段的默认访问器并且没有修改器)。

    使用与 Cassandra 表相同的名称和类型创建 case class 即可:

    case class Data(keyval: Long, rangef:Long , arrayval: String) extends Serializable
    

    【讨论】:

    • case 类默认也是可序列化的,因此无需显式扩展。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-01-30
    • 2012-09-27
    • 2017-10-07
    • 2016-09-13
    • 1970-01-01
    • 1970-01-01
    • 2017-02-09
    相关资源
    最近更新 更多