【问题标题】:How to use connection pool to postgresql in spark如何在spark中使用连接池到postgresql
【发布时间】:2018-12-18 20:04:38
【问题描述】:

我有一个 spark (1.2.1 v) 作业,它使用 postgresql.Driver for scala 将 rdd 的内容插入到 postgres:

rdd.foreachPartition(iter => {

        //connect to postgres database on the localhost
        val driver = "org.postgresql.Driver"
        var connection:Connection = null
        Class.forName(driver)
        connection = DriverManager.getConnection(url, username, password)
        val statement = connection.createStatement()

        iter.foreach(row => {
            val mapRequest = Utils.getInsertMap(row)
            val query = Utils.getInsertRequest(squares_table, mapRequest)

            try { statement.execute(query) } 
            catch {
                case pe: PSQLException => println("exception caught: " + pe);
            }
        })
        connection.close()
})

在上面的代码中,我为 rdd 的每个分区打开到 postgres 的新连接并关闭它。我认为正确的方法是使用连接池连接到我可以从中获取连接的 postgres(如 here 所述),但它只是伪代码:

rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
}

用 spark 的连接池连接 postgres 的正确方法是什么?

【问题讨论】:

    标签: postgresql scala apache-spark


    【解决方案1】:

    此代码适用于 spark 2 或更高版本和 scala,首先您必须添加 spark jdbc 驱动程序。

    如果您使用的是 Maven,那么您可以这样工作。将此设置添加到您的 pom 文件中

        <dependency>
            <groupId>postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.1-901-1.jdbc4</version>
        </dependency>
    

    将此代码写入scala文件

    import org.apache.spark.sql.SparkSession
    
    object PostgresConnection {
      def main(args: Array[String]) {
        val spark =
            SparkSession.builder()
            .appName("DataFrame-Basic")
            .master("local[4]")
            .getOrCreate()
    
       val prop = new java.util.Properties
       prop.setProperty("driver","org.postgresql.Driver")
       prop.setProperty("user", "username")
       prop.setProperty("password", "password")
      val url = "jdbc:postgresql://127.0.0.1:5432/databaseName"
      val df = spark.read.jdbc(url, "table_name",prop)
      println(df.show(5))
     }
    }
    

    【讨论】:

    • 这将为df的每个分区创建一个新连接,并且不使用由jdbc连接组成的ConnectionPool。
    猜你喜欢
    • 1970-01-01
    • 2013-10-12
    • 1970-01-01
    • 2016-07-05
    • 1970-01-01
    • 1970-01-01
    • 2014-09-15
    • 2014-11-04
    • 1970-01-01
    相关资源
    最近更新 更多