【发布时间】:2018-12-18 20:04:38
【问题描述】:
我有一个 spark (1.2.1 v) 作业,它使用 postgresql.Driver for scala 将 rdd 的内容插入到 postgres:
rdd.foreachPartition(iter => {
//connect to postgres database on the localhost
val driver = "org.postgresql.Driver"
var connection:Connection = null
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
iter.foreach(row => {
val mapRequest = Utils.getInsertMap(row)
val query = Utils.getInsertRequest(squares_table, mapRequest)
try { statement.execute(query) }
catch {
case pe: PSQLException => println("exception caught: " + pe);
}
})
connection.close()
})
在上面的代码中,我为 rdd 的每个分区打开到 postgres 的新连接并关闭它。我认为正确的方法是使用连接池连接到我可以从中获取连接的 postgres(如 here 所述),但它只是伪代码:
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
用 spark 的连接池连接 postgres 的正确方法是什么?
【问题讨论】:
标签: postgresql scala apache-spark