我遇到了类似的问题,我使用Spark-JobServer解决了。
使用 Spark-Jobserver (SJS) 的主要方法通常是创建一个扩展其 SparkSQLJob 的特殊作业,例如以下示例:
object ExecuteQuery extends SparkSQLJob {
override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = sqlContext.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
但是,要使这种方法与 Cassandra 很好地配合使用,您必须使用 spark-cassandra-connector,然后,要加载数据,您将有两个选择:
1) 在通过 REST 调用此 ExecuteQuery 之前,您必须将要查询的完整数据从 Cassandra 传输到 Spark。为此,您可以执行类似的操作(改编自 spark-cassandra-connector documentation 的代码):
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()
然后将其注册为表,以便 SparkSQL 能够访问它:
df.registerAsTempTable("myTable") // As a temporary table
df.write.saveAsTable("myTable") // As a persistent Hive Table
只有在那之后,您才能使用ExecuteQuery 从myTable 进行查询。
2) 由于第一个选项在某些用例中可能效率低下,因此还有另一个选项。
spark-cassandra-connector 有一个特殊的CassandraSQLContext,可用于直接从 Spark 查询 C* 表。它可以像这样使用:
val cc = new CassandraSQLContext(sc)
val df = cc.sql("SELECT * FROM keyspace.table ...")
但是,要在 Spark-JobServer 中使用不同类型的上下文,您需要扩展 SparkContextFactory 并在创建上下文时使用它(可以通过对 /contexts 的 POST 请求来完成)。可以在SJS Gitub 上看到一个特殊上下文工厂的示例。您还必须创建一个SparkCassandraJob,扩展SparkJob(但这部分非常easy)。
最后,ExecuteQuery 作业必须适应新的类。它会是这样的:
object ExecuteQuery extends SparkCassandraJob {
override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = cc.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
之后,ExecuteQueryjob 可以通过 POST 请求通过 REST 执行。
结论
这里我使用第一个选项,因为我需要HiveContext 中可用的高级查询(例如,窗口函数),而CassandraSQLContext 中没有这些查询。但是,如果您不需要这些操作,我推荐第二种方法,即使它需要一些额外的编码来为 SJS 创建一个新的 ContextFactory。