【问题标题】:Query Spark SQL from Node.js server从 Node.js 服务器查询 Spark SQL
【发布时间】:2016-09-02 13:08:45
【问题描述】:

我目前正在使用 npm 的 cassandra-driver 从 Node.js 服务器查询我的 Cassandra 数据库。因为我希望能够编写更复杂的查询,所以我想使用 Spark SQL 而不是 CQL。有什么方法可以创建一个 RESTful API(或其他东西),以便我可以像目前使用 CQL 一样使用 Spark SQL?

换句话说,我希望能够将 Spark SQL 查询从我的 Node.js 服务器发送到另一台服务器并返回结果。

有没有办法做到这一点?我一直在寻找这个问题的解决方案,但还没有找到任何东西。

编辑:我可以在 Spark shell 中使用 Scala 和 Spark SQL 查询我的数据库,因此该位正在工作。我只需要以某种方式连接 Spark 和我的 Node.js 服务器。

【问题讨论】:

    标签: node.js apache-spark cassandra apache-spark-sql


    【解决方案1】:

    我遇到了类似的问题,我使用Spark-JobServer解决了。

    使用 Spark-Jobserver (SJS) 的主要方法通常是创建一个扩展其 SparkSQLJob 的特殊作业,例如以下示例:

    object ExecuteQuery extends SparkSQLJob {
      override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
        // Code to validate the parameters received in the request body
      }
      override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
        // Assuming your request sent a { "query": "..." } in the body:
        val df = sqlContext.sql(config.getString("query"))
        createResponseFromDataFrame(df) // You should implement this
      }
    }
    

    但是,要使这种方法与 Cassandra 很好地配合使用,您必须使用 spark-cassandra-connector,然后,要加载数据,您将有两个选择:

    1) 在通过 REST 调用此 ExecuteQuery 之前,您必须将要查询的完整数据从 Cassandra 传输到 Spark。为此,您可以执行类似的操作(改编自 spark-cassandra-connector documentation 的代码):

    val df = sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> "words", "keyspace" -> "test"))
      .load() 
    

    然后将其注册为表,以便 SparkSQL 能够访问它:

    df.registerAsTempTable("myTable") // As a temporary table
    df.write.saveAsTable("myTable") // As a persistent Hive Table
    

    只有在那之后,您才能使用ExecuteQuerymyTable 进行查询。

    2) 由于第一个选项在某些用例中可能效率低下,因此还有另一个选项。

    spark-cassandra-connector 有一个特殊的CassandraSQLContext,可用于直接从 Spark 查询 C* 表。它可以像这样使用:

    val cc = new CassandraSQLContext(sc)
    val df = cc.sql("SELECT * FROM keyspace.table ...")
    

    但是,要在 Spark-JobServer 中使用不同类型的上下文,您需要扩展 SparkContextFactory 并在创建上下文时使用它(可以通过对 /contexts 的 POST 请求来完成)。可以在SJS Gitub 上看到一个特殊上下文工厂的示例。您还必须创建一个SparkCassandraJob,扩展SparkJob(但这部分非常easy)。

    最后,ExecuteQuery 作业必须适应新的类。它会是这样的:

    object ExecuteQuery extends SparkCassandraJob {
      override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
        // Code to validate the parameters received in the request body
      }
      override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
        // Assuming your request sent a { "query": "..." } in the body:
        val df = cc.sql(config.getString("query"))
        createResponseFromDataFrame(df) // You should implement this
      }
    }
    

    之后,ExecuteQueryjob 可以通过 POST 请求通过 REST 执行。


    结论

    这里我使用第一个选项,因为我需要HiveContext 中可用的高级查询(例如,窗口函数),而CassandraSQLContext 中没有这些查询。但是,如果您不需要这些操作,我推荐第二种方法,即使它需要一些额外的编码来为 SJS 创建一个新的 ContextFactory。

    【讨论】:

      猜你喜欢
      • 2019-03-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-15
      • 2015-10-27
      相关资源
      最近更新 更多