【问题标题】:How to operate numPartitions, lowerBound, upperBound in the spark-jdbc connection?spark-jdbc连接中的numPartitions、lowerBound、upperBound如何操作?
【发布时间】:2018-07-25 22:11:37
【问题描述】:

我正在尝试使用 spark-jdbc 读取 postgres db 上的表。为此,我提出了以下代码:

object PartitionRetrieval {
  var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/myuser/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "base.ledgers"
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  def main(args: Array[String]): Unit = {
    val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
    val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
    println("gpTable Count: " + rc)
  }
}

现在,我正在获取行数只是为了查看连接是成功还是失败。这是一个巨大的表,它运行速度较慢以获取我理解的计数,因为没有为数据分区应该发生的分区号和列名提供参数。

在很多地方,我看到 jdbc 对象是通过以下方式创建的:

val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties) 

我使用options 以另一种格式创建了它。 当使用“选项”形成 jdbc 连接时,我无法理解如何给出 numPartitions,我希望在其上对数据进行分区的分区列名称:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

谁能告诉我

  1. 如何添加参数:numPartitions, lowerBound, upperBound 到这样写的jdbc对象:

    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password", devPassword).load()

  2. 如何只添加 columnnamenumPartition 因为我想获取 年份的所有行:2017 年,我不想要一个范围 要选择的行数(lowerBound,upperBound)

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    numPartitions, lowerBound, upperBound and PartitionColumn 选项控制 spark 中的并行读取。您需要一个完整的 PartitionColumn 列。如果您的表中没有合适的列,那么您可以使用ROW_NUMBER 作为您的分区列。

    试试看,

    val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
                                           .option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
                                           .option("user",devUserName)
                                           .option("password",devPassword)
                                           .load()
                                           .collect()
                                           .map(row => row.getAs[Int]("count")).head
    

    我们得到了为提供的谓词返回的行数,该谓词可以用作 upperBount。

    val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                       .option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
                                       .option("user",devUserName)
                                       .option("password",devPassword)
                                       .option("numPartitions", 10)
                                       .option("partitionColumn", "RNO")
                                       .option("lowerBound", 1)
                                       .option("upperBound", rowCount)
                                       .load()
    

    numPartitions 取决于与 Postgres 数据库的并行连接数。您可以根据从数据库读取时所需的并行化进行调整。

    【讨论】:

    • 那么“RNO”将作为spark的一列对数据进行分区?
    • 是的。它将充当 partitionColumn。
    • 这个 ROW_NUMBER 查询在什么时候执行?是在开始时还是在每个分区的每个导入查询中只有一次?只是好奇一个无序的行号是否会导致导入数据框中的重复记录!?
    • @Adiga 这是从源读取数据时。
    【解决方案2】:

    要处理这样的查询,依赖 Spark 聚合是没有意义的。

    最好将工作委托给数据库:

    val sourceSystemName = "ORACLE"
    
    val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
       .option("dbtable",
         s"(SELECT COUNT(*) FROM $tableName WHERE source_system_name = '$sourceSystemName') AS t")
       .option("user",devUserName)
       .option("password",devPassword).load()
    

    无需额外配置,数据在其所在的位置得到尽可能高效的处理。

    【讨论】:

    • 所以不需要让Spark对收到的数据做分区?这不会使处理速度变慢吗?
    猜你喜欢
    • 2017-04-26
    • 1970-01-01
    • 1970-01-01
    • 2019-09-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多