【问题标题】:How does Spark work with a JDBC connection?Spark 如何使用 JDBC 连接?
【发布时间】:2018-07-24 17:52:27
【问题描述】:

我是 Spark 的新手,我正在尝试使用 spark-jdbc 程序来计算数据库中的行数。

我想出了这个代码:

object PartitionRetrieval {
    var conf  = new SparkConf().setAppName("Spark-JDBC")
    val log   = LogManager.getLogger("Spark-JDBC Program")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conFile       = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
    val properties    = new Properties()
    properties.load(new FileInputStream(conFile))
    val connectionUrl = properties.getProperty("gpDevUrl")
    val devUserName   = properties.getProperty("devUserName")
    val devPassword   = properties.getProperty("devPassword")
    val driverClass   = properties.getProperty("gpDriverClass")
    val tableName     = "source.bank_accounts"
    try {
    Class.forName(driverClass).newInstance()
    } catch {
    case cnf: ClassNotFoundException =>
        log.error("Driver class: " + driverClass + " not found")
        System.exit(1)
    case e: Exception =>
        log.error("Exception: " + e.printStackTrace())
        System.exit(1)
    }
    def main(args: Array[String]): Unit = {
        val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
        val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                                        .option("dbtable",tableName)
                                                        .option("user",devUserName)
                                                        .option("password",devPassword).load()
        val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE").count()
        println("gpTable Count: " + rc)
    }
}

到目前为止,此代码正在运行。但我对此有两个概念上的怀疑。

  1. 在 Java 中,我们创建一个连接类并使用该连接来查询多个表,并在满足我们的要求后关闭它。但它似乎以不同的方式工作。 如果我必须查询数据库中的 10 个表,我是否应该使用该行 10 次并在其中使用不同的表名: 在 Java 中,我们创建一个连接类,并使用该连接来查询多个表,并在满足我们的要求后关闭它。但它似乎以不同的方式工作。 如果我必须查询数据库中的 10 个表,我是否应该在其中使用不同表名的这一行 10 次:

    val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                                    .option("dbtable",tableName)
                                                    .option("user",devUserName)
                                                    .option("password",devPassword).load()
    
  2. 这里使用的当前表总共有 2000 行。我可以相应地使用过滤器/选择/聚合函数。 但是在我们的生产中有几百万行的表,如果我在上面的语句中放了一个巨大的表,即使我们的要求稍后过滤了它,难道不是先创建一个巨大的数据框吗?

    李>

有人愿意就我上面提到的疑问给我一些见解吗?

【问题讨论】:

  • 进展如何?
  • 很好,我的朋友????
  • SO 是一个很好的网站——尽管人们可以在网络上阅读到那里的批评
  • 我同意你的意见,先生

标签: apache-spark


【解决方案1】:

将 SQL 查询传递给它首先称为下推到数据库。

例如

val dataframe_mysql = spark.read.jdbc(jdbcUrl, "(select k, v from sample where k = 1) e", connectionProperties)  

您可以用 s""" 将 k = 1 替换为主机变量,或者,构建您自己的 SQL 字符串并按照您的建议重用,但如果您不这样做,世界仍然存在。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-11-20
    • 1970-01-01
    • 1970-01-01
    • 2020-07-14
    • 1970-01-01
    • 2017-01-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多