【问题标题】:Load PostgreSQL database to SchemaRDD将 PostgreSQL 数据库加载到 SchemaRDD
【发布时间】:2015-02-24 18:21:28
【问题描述】:

我在 PostgreSQL 中有 100 万行和 100+ 列的数据源,我想使用 Spark SQL,所以我想将此数据源转换为 SchemaRDD

Spark SQL Programming Guide中介绍了两种方法, 一种是通过反射,这意味着我需要定义:

case class Row(Var1: Int, Var2: String, ...)

这很乏味,因为我有 100 多列。

另一种方法是“以编程方式指定架构”,这意味着我需要定义:

val schema =
  StructType(
    Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))

这对我来说也很乏味。

其实还有一个问题,因为我使用JdbcRDD类加载我的PostgreSQL数据库,但是我发现我还需要在JdbcRDD构造函数的mapRow参数中定义架构,如下所示:

def extractValues(r: ResultSet) = {
  (r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
  "SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
  0, 1000000, 1, extractValues)

这个API还是要求我自己创建schema,更糟糕的是我需要重做类似的事情来将这个JdbcRDD转换为SchemaRDD,那将是非常笨拙的代码。

所以我想知道完成这项任务的最佳方法是什么?

【问题讨论】:

    标签: postgresql scala apache-spark apache-spark-sql


    【解决方案1】:

    您只需要支持有限数量的数据类型。为什么不使用

    java.sql.ResultSetMetaData
    

    例如

    val rs = jdbcStatement.executeQuery("select * from myTable limit 1")
    val rmeta = rs.getMetaData
    

    读取一行,然后为每一列动态生成所需的 StructField。

    你需要一个case语句来处理

    val myStructFields = for (cx <- 0 until rmeta.getColumnCount) {
           val jdbcType = rmeta.getColumnType(cx)
           } yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType))
    
    val mySchema = StructType(myStructFields.toSeq)
    

    其中 jdbcToSparkType 大致如下:

      def jdbcToSparkType(jdbcType: Int) = {
        jdbcType match {
           case 4 => InteegerType  
           case 6 => FloatType
            ..
       }  
    

    UPDATE 要生成 RDD[Row] :您将遵循类似的模式。在这种情况下,你会

    val rows = for (rs.next) {
        row = jdbcToSpark(rs)
        } yield row
    
    val rowRDD = sc.parallelize(rows)
    

    在哪里

    def jdbcToSpark(rs: ResultSet) = {
       var rowSeq = Seq[Any]()
       for (cx <- 0 to rs.getMetaData.getColumnCount) {
         rs.getColumnType(cx) match {
             case 4 => rowSeq :+ rs.getInt(cx)
              ..
         }
       }
       Row.fromSeq(rowSeq)
    }
    

    那么 val 行

    【讨论】:

      猜你喜欢
      • 2018-12-24
      • 1970-01-01
      • 1970-01-01
      • 2017-07-06
      • 1970-01-01
      • 2021-11-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多