【问题标题】:JDBC to Spark Dataframe - How to ensure even partitioning?JDBC 到 Spark Dataframe - 如何确保均匀分区?
【发布时间】:2019-10-25 07:32:52
【问题描述】:

我是 Spark 的新手,正在努力通过 JDBC 使用 spark.read.jdbc 从 Postgres 数据库表创建 DataFrame。

我对分区选项有些困惑,尤其是 partitionColumnlowerBoundupperBoundnumPartitions >.


  • 文档似乎表明这些字段是可选的。 如果我不提供它们会怎样?
  • Spark 如何知道如何对查询进行分区?效率如何?
  • 如果我指定了这些选项,我如何确保分区大小是大致的,即使 partitionColumn 分布不均匀?

假设我将有 20 个执行者,所以我将我的 numPartitions 设置为 20。
我的 partitionColumn 是一个自动递增的 ID 字段,假设值范围从 1 到 2,000,000
但是,由于用户选择处理一些非常旧的数据以及一些非常新的数据,中间没有任何内容,因此大多数数据的 ID 值要么低于 100,000,要么高于 1,900,000。

  • 我的第 1 位和第 20 位执行者会获得大部分工作,而其他 18 位执行者大部分时间都闲置吗?

  • 如果是这样,有没有办法防止这种情况发生?

【问题讨论】:

    标签: apache-spark jdbc apache-spark-sql partitioning


    【解决方案1】:

    所有这些选项是什么:spark.read.jdbc 是指从 RDBMS 中读取表。

    并行性是火花的力量,为了实现这一点,您必须提及所有这些选项。

    问题 :-)

    1) 文档似乎表明这些字段是可选的。如果我不提供会怎样?

    答案:默认并行度或并行度差

    根据场景,开发人员必须注意性能调优策略。并确保数据跨边界(也称为分区)拆分,而这些边界又将是并行任务。这样看。

    2) Spark 是如何知道如何对查询进行分区的?效率如何?

    jdbc-reads -referring to databricks docs

    您可以根据数据集的列值提供分割边界。

    • 这些选项指定读取时的并行度。
    • 如果指定了这些选项中的任何一个,则必须全部指定。

    注意

    这些选项指定表读取的并行度lowerBoundupperBound 决定分区步长,但不 过滤表中的行。因此,Spark 分区并返回所有 表格中的行。

    示例 1:
    您可以使用partitionColumnlowerBoundupperBoundnumPartitions 参数在emp_no 列上拆分跨执行程序读取的表。

    val df = spark.read.jdbc(url=jdbcUrl,
        table="employees",
        columnName="emp_no",
        lowerBound=1L,
        upperBound=100000L,
        numPartitions=100,
        connectionProperties=connectionProperties)
    

    numPartitions 也表示您要求 RDBMS 读取数据的并行连接数。如果您提供 numPartitions,那么您将限制连接数......而不会耗尽 RDBMS 端的连接......

    示例 2 来源:datastax presentation to load oracle data in cassandra

    val basePartitionedOracleData = sqlContext
    .read
    .format("jdbc")
    .options(
    Map[String, String](
    "url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
    "dbtable" -> "ExampleTable",
    "lowerBound" -> "1",
    "upperBound" -> "10000",
    "numPartitions" -> "10",
    "partitionColumn" -> “KeyColumn"
    )
    )
    .load()
    

    该映射中的最后四个参数用于获取分区数据集。如果你通过其中任何一个, 你必须通过所有这些。

    当你传入这些额外的参数时,它的作用如下:

    它构建格式为

    的 SQL 语句模板
    SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
    {partitionColumn} < ?
    

    它将 {numPartitions} 语句发送到数据库引擎。如果您提供了这些值:{dbTable=ExampleTable, lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn},它会创建这十个 声明:

    SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
    SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
    SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
    And then it would put the results of each of those queries in its own partition in Spark.
    

    问题 :-)

    如果我指定了这些选项,我如何确保分区 即使 partitionColumn 不均匀,大小也大致相同 分散式?

    我的第 1 位和第 20 位执行者会获得大部分工作,而其他 18个执行者坐在那里,大部分都是闲置的?

    如果是这样,有没有办法防止这种情况发生?


    所有问题都有一个答案

    下面是方法... 1)您需要了解每个分区有多少记录/行数......基于此您可以repartitioncoalesce

    片段 1:Spark 1.6 >
    spark 2.x 提供了了解分区中有多少记录的工具。

    spark_partition_id() 存在于org.apache.spark.sql.functions

    import org.apache.spark.sql.functions._ 
    val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
    df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show
    

    片段 2:适用于所有版本的 spark

    df
      .rdd
      .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
      .toDF("partition_number","NumberOfRecordsPerPartition")
      .show
    

    然后您需要再次合并您的策略在范围之间进行查询调整或重新分区等......,您可以使用 mappartitions 或 foreachpartitions

    结论:我更喜欢使用适用于数字列的给定选项,因为我已经看到它将数据划分为统一的 边界/分区。

    有时可能无法手动使用这些选项 需要调整分区/并行度...


    更新:

    With the below we can achive uniform distribution...

    1. 获取表的主键。
    2. 找出关键的最小值和最大值。
    3. 使用这些值执行 Spark。
    
    def main(args: Array[String]){
    // parsing input parameters ...
    val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
    val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
        val min = result.getString(1).toInt
        val max = result.getString(2).toInt
        val numPartitions = (max - min) / 5000 + 1
    val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
    var df = spark.read.format("jdbc").
    option("url", s"${url}${config("schema")}").
    option("driver", "com.mysql.jdbc.Driver").
    option("lowerBound", min).
    option("upperBound", max).
    option("numPartitions", numPartitions).
    option("partitionColumn", primaryKey).
    option("dbtable", config("table")).
    option("user", user).
    option("password", password).load()
    // some data manipulations here ...
    df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      
    }
    

    【讨论】:

    • 谢谢。我知道其中的大部分内容,但我可以使用更多关于如何进行查询调整/重新分区的细节。也就是说,一旦我得到了 df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show 的输出,我到底该怎么做呢?
    • - 查询 ranges 之间的优化扫描,如果它的字符串列。例如,如果您有称为 10 个类别的字符串列...那么您可以像示例 2 中所示那样扫描每个类别。 - 重新分区:一般来说,重新分区可以在没有执行者 * 核心 * 复制因子的情况下完成。例如,您有 20 个执行程序 * 4 个核心 * 2-3 = 160-240 个分区,您可以使用。要了解分区是否具有大致相等数量的记录..您可以检查上面的代码 sn-ps 就是我的意思。
    【解决方案2】:

    我找到了一种手动指定分区边界的方法,使用 jdbc constructor with the predicates parameter

    它允许您明确指定要在每个分区的“where”子句中插入的单个条件,这允许您准确指定每个分区将接收的行范围。所以,如果你没有一个均匀分布的列来自动分区,你可以自定义你自己的分区策略。

    如何使用它的示例可以在this question 接受的答案中找到。

    【讨论】:

    • int_id &lt; 500000", "int_id &gt;= 500000 &amp;&amp; int_id &lt; 1000000 能工作的保证是什么。考虑连续添加和删除的示例,其中 int_id 可能以大约 1000 条记录结束,这只是第一个谓词......所以我们需要选择记录的最小值和最大值,并且我们必须作为谓词传递.. 请查看我的更新,这将在这种情况下工作。
    • @JoeMjr2 numPartitions 是否也适用于嵌套查询?
    猜你喜欢
    • 2016-11-16
    • 1970-01-01
    • 1970-01-01
    • 2018-11-24
    • 2015-04-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多