【问题标题】:Query cassandra from spark executor从 spark executor 查询 cassandra
【发布时间】:2016-11-17 21:47:03
【问题描述】:

我有一个来自 kafka 的流式应用程序,我想知道是否有办法从地图函数内部进行范围查询?

我按时间范围和密钥对来自 kafka 的消息进行分组,然后根据这些时间范围和密钥,我想将数据从 cassandra 提取到该 dstream 中。

类似:

lookups
  .map(lookup => ((lookup.key, lookup.startTime, lookup.endTime), lookup))
  .groupByKey()
  .transform(rdd => {
    val cassandraSQLContext = new CassandraSQLContext(rdd.context)
    rdd.map(lookupPair => {
      val tableName = //variable based on lookup
      val startTime = aggLookupPair._1._2
      val endTime = aggLookupPair._1._3

      cassandraSQLContext
        .cassandraSql(s"SELECT * FROM ${CASSANDRA_KEYSPACE}.${tableName} WHERE key=${...} AND start_time >= ${startTime} AND start_time < ${endTime};")
        .map(row => {
           //match to {
            case /*case 1*/ => new object1(row)
            case /*case 2*/ =>new object2(row)
          }
        })
        .collect()
    })
  })

这给了我一个空指针异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 63, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:70)
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:130)
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:123)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

我也尝试过ssc.cassandraTable(CASSANDRA_KEYSPACE, tableName).where("key = ?", ...)...,但在尝试访问地图内的 StreamingContext 时引发崩溃。

如果有人有任何建议,我将不胜感激。谢谢!

【问题讨论】:

    标签: apache-spark cassandra spark-streaming spark-cassandra-connector


    【解决方案1】:

    如果您的查询基于分区键,您可能需要使用 joinWithCassandraTable

    但是如果你需要更多的灵活性

    CassandraConnector(sc.getConf).withSessionDo( session => ...)
    

    将允许您访问执行程序上的会话池并执行您想要的任何操作,而无需管理连接。代码都是可序列化的,可以放在地图中。

    【讨论】:

    • 我不认为SparkConf 是可序列化的,我遇到了序列化问题。我也试图避免 joinWithCassandraTable,因为我不能用它进行范围查询。
    • 您可以将范围查询与 JoinWithCassandraTable 一起使用,它接受 CassandraTableRDD 接受的所有子句。并且 CassandraConnector 是可序列化的 val cc = CassandraConnector(sc.getConf) 然后在你喜欢的地方使用 cc
    猜你喜欢
    • 2019-08-11
    • 2015-09-30
    • 2016-05-19
    • 2017-04-17
    • 2019-05-13
    • 1970-01-01
    • 2016-05-08
    • 2018-10-31
    • 2019-11-13
    相关资源
    最近更新 更多