【问题标题】:How to convert java Resultset into Spark dataframe如何将 java 结果集转换为 Spark 数据框
【发布时间】:2018-01-12 17:44:26
【问题描述】:

我正在尝试将preparestatement 与JDBC 一起使用。它产生 ResultSet 对象。我想把它转换成火花数据框。

object JDBCRead {

val tableName:String = "TABLENAME"
val url :String = "jdbc:teradata://TERADATA_URL/user=USERNAME,password=PWD,charset=UTF8,TYPE=FASTEXPORT,SESSIONS=10"
val  selectTable:String  = "SELECT * FROM " + tableName +" sample 10";

 val con : Connection = DriverManager.getConnection(url);


 val pstmt2: PreparedStatement = con.prepareStatement(selectTable)

import java.sql.ResultSet

val rs: ResultSet = pstmt2.executeQuery



val rsmd: ResultSetMetaData = rs.getMetaData
while(rs.next()!=null)
{
  val k: Boolean = rs.next()
  for(i<-1 to rsmd.getColumnCount) {
    print(" " + rs.getObject(i))
  }
  println()
}

}

我想从 Spark Dataframe 调用上面的代码,以便我可以将数据加载到 DataFrame 并更快地分布式获取结果。

我必须使用PreparedStatement。我不能使用spark.jdbc.load,因为 Teradata 的 FASTEXPORT 不适用于 jdbc 负载。它必须与PreparedStatement一起使用

如何做到这一点?如何将preparestatement 与SELECT 语句一起加载到Spark Dataframe 中。

【问题讨论】:

    标签: apache-spark dataframe jdbc teradata


    【解决方案1】:

    -

    AFAIK 有 2 个选项可用于此类要求 1.DataFrame 2.JdbcRDD

    我会提供JdbcRDD(因为您对preparedstatement 非常具体)

    compute 方法内部使用了prepareStatement。因此,您无需显式创建和维护连接(容易出错)。

    稍后您可以将结果转换为数据框

    为了速度,您可以配置其他参数。

    JdbcRDD的示例代码用法如下..

    import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkContext
      import org.apache.spark.SparkContext.__
      import org.apache.spark.SparkConf
      import org.apache.spark.rdd.JdbcRDD
      import java.sql.{connection, DriverManager,ResultSet}
    
    
      object jdbcRddExample {
        def main(args: Array[String]) {
    
            // Connection String    
            VAL URL = "jdbc:teradata://SERVER/demo"
            val username = "demo"
            val password = "Spark"
            Class.forName("com.teradata.jdbc.Driver").newInstance
            // Creating & Configuring Spark Context
            val conf = new SparkConf().setAppName("App1").setMaster("local[2]").set("spark.executor.memory",1)
            val sc = new SparkContext(conf)
            println("Start...")
            // Fetching data from Database
            val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password),
            "select first_name, last_name, gender from person limit ?,?",
            3,5,1,r => r.getString("last_name") + "," +r.getString("first_name"))
            // Displaying the content
            myRDD.foreach(println)
            // Saving the content inside Text File
            myRDD.saveAsTextFile("c://jdbcrdd")
    
            println("End...")
        }
      }
    

    【讨论】:

    • 谢谢。你能谈谈涉及数据帧的其他方法吗!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-09-28
    • 2017-04-28
    • 1970-01-01
    • 1970-01-01
    • 2013-12-01
    • 2019-11-04
    • 2017-07-08
    相关资源
    最近更新 更多