【问题标题】:How can I connect to a postgreSQL database into Apache Spark using scala?如何使用 scala 将 postgreSQL 数据库连接到 Apache Spark?
【发布时间】:2014-09-15 00:19:13
【问题描述】:

我想知道如何在 scala 中执行以下操作?

  1. 使用 Spark scala 连接到 postgreSQL 数据库。
  2. 编写 SQL 查询(如 SELECT、UPDATE 等)来修改表 那个数据库。

我知道使用 scala 来做,但是如何在打包时将 psql scala 的连接器 jar 导入到 sbt 中?

【问题讨论】:

  • 为什么投反对票?我认为这是一个很好的问题。它非常通用,但答案也可以通用并帮助很多用户。
  • 你最终使用的是 mysql 还是 postgres?如果 postgres 可以看看你的 sbt 和代码示例吗?

标签: scala apache-spark psql


【解决方案1】:

我们的目标是从 Spark 工作线程运行并行 SQL 查询。

构建设置

将连接器和 JDBC 添加到 build.sbt 中的 libraryDependencies。我只在 MySQL 上试过这个,所以我会在我的例子中使用它,但 Postgres 应该差不多。

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

代码

当您创建SparkContext 时,您告诉它要将哪些 jar 复制到执行程序。包括连接器罐。一个好看的方法:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

现在 Spark 已准备好连接到数据库。每个执行器都会运行部分查询,以便为分布式计算做好准备。

对此有两种选择。较旧的方法是使用org.apache.spark.rdd.JdbcRDD

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

查看参数的文档。简要说明:

  • 你有SparkContext
  • 然后是创建连接的函数。这将在每个工作人员上调用以连接到数据库。
  • 然后是 SQL 查询。这必须与示例类似,并且包含开始键和结束键的占位符。
  • 然后您指定键的范围(在我的示例中为 0 到 1000)和分区数。范围将在分区之间划分。因此,在示例中,一个执行线程最终会执行 SELECT * FROM FOO WHERE 0 &lt;= KEY AND KEY &lt;= 100
  • 最后我们有了一个函数,可以将ResultSet 转换成某种东西。在示例中,我们将其转换为 String,因此您最终会得到 RDD[String]

从 Apache Spark 版本 1.3.0 开始,另一种方法可通过 DataFrame API 获得。你可以创建一个org.apache.spark.sql.DataFrame,而不是JdbcRDD

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

请参阅https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases 了解完整的选项列表(可以像JdbcRDD 一样设置键范围和分区数)。

更新

JdbcRDD 不支持更新。但是您可以简单地在foreachPartition 中进行操作。

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(这会为每个分区创建一个连接。如果这是一个问题,请使用连接池!)

DataFrames 支持通过createJDBCTableinsertIntoJDBC 方法进行更新。

【讨论】:

  • 您的更新会为每个分区创建一个新连接。
  • 是的。每个分区可能在不同的机器上处理,因此它们不能共享一个连接。不过,您可以使用连接池,这样如果两个分区在同一台机器上、在同一线程中、一个接一个地处理,它们就可以重新使用连接。据我所知,Java 标准 API 中没有连接池,因此这会使示例变得非常复杂。但是,如果您知道一个好的解决方案,请告诉我!
  • 正确。 1.5.1 的新 Spark 文档显示了 3 个关于此案例的注意事项示例。他们有一个使用连接池的非常优雅的解决方案。
  • 太棒了!您在文档中有指向此页面的链接吗?谢谢!
  • 抱歉,我在该 URL 找不到任何有关连接池的信息。我错过了什么?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-04-26
  • 1970-01-01
  • 1970-01-01
  • 2017-12-04
  • 1970-01-01
  • 1970-01-01
  • 2020-08-05
相关资源
最近更新 更多