【问题标题】:How to design finding lowerBound & upperBound for spark read statement to partition the incoming data?如何为 spark read 语句设计查找 lowerBound 和 upperBound 来对传入数据进行分区?
【发布时间】:2021-06-14 17:53:44
【问题描述】:

为了在 spark 中加载和分区传入的数据,我使用以下语法。

val dataframe = spark.read.format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("user", user)
          .option("password", password)
          .option("dbtable", query)
          .option("partitionColumn", partitionColumn)
          .option("lowerBound", lowerBound_value)
          .option("upperBound", upperBound_value)
          .option("numPartitions", numPartitions)
          .option("fetchsize", 15000)
          .load()

参数partitionColumnlowerBoundupperBoundnumPartitions用于优化作业的性能。

我有一个包含 1000 条记录的表和一个整数列,其中的序列号从 1 到 1000。 我首先在该列上运行minmax,以将min 值分配给lowerBound,将max 值分配给upperBound。 numPartitions 参数指定为 3,以便将传入数据均匀(或接近均匀)分成 3 个不同的分区。

上述设计在数据较少的情况下效果很好。但我有一个场景如下。

我有一个包含 2030 亿条记录的表,其中没有包含唯一/序列整数的整数列。然后有一个日期列,其数据分布在 5 年,即 2016-2021 年。 为了更快地移动数据,我每次都在移动每年一个月的数据。 这是我正在使用的查询:

val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"

所以上面的查询变成了: select * from table where date_column &gt;= '2016-01-01' and date_time &lt;= '2016-01-31 23:59:59.999'' 以此类推,每年每个月的第一天和最后一天。

这是对我的循环方式的粗略描述:

(2016 to 2021) { year =>
   (1 to 12) { month =>
           val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
           val dataframe = spark.read.format("jdbc")
              .option("url", url)
              .option("driver", driver)
              .option("user", user)
              .option("password", password)
              .option("dbtable", query)
              .option("partitionColumn", partitionColumn)
              .option("lowerBound", lowerBound_value)
              .option("upperBound", upperBound_value)
              .option("numPartitions", numPartitions)
              .option("fetchsize", 15000)
              .load()
   }
}

为了找出界限,我使用了相同的月份和年份过滤器,如下所示:

val bounds = spark.read.format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("user", user)
          .option("password", password)
          .option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
          .load()

val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)

问题在于找到过滤数据的下限和上限。 由于数据量巨大,使用给定过滤器在 partitionColumn 上运行 min 和 max 的查询比将实际数据帧写入 hdfs 所花费的时间更多。

我尝试在那里给出随机值,但在任务运行时观察到分区中的数据倾斜。

是否必须将 partitionColumn 的 min 和 max 作为下限和上限以更好地分布数据? 如果没有,有没有办法指定下限和上限,而不是对数据运行最小和最大查询?

非常感谢任何帮助。

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    拥有 200 多亿行,我希望您的表在您正在访问数据的同一日期列上分区在您的数据库中。没有它,查询将毫无希望。

    但是您是否尝试过在上下限中等效于日期/时间戳值的整数?检查 this reference 以了解 Spark 将整数值转换为时间戳。

    JDBC 选项 lowerBound 和 upperBound 被转换为 TimestampType/DateType 值的方式与将字符串转换为相同的方式 TimestampType/DateType 值。转换基于 Proleptic 公历和 SQL 配置定义的时区 spark.sql.session.timeZone。在 Spark 2.4 及更低版本中, 转换基于混合日历 (Julian + Gregorian) 和 默认系统时区。

    正如您所提到的,这里没有可以使用的预先存在的整数列。因此,对于您的循环,上限和下限是静态的,因此可以转换为静态的上下数值。基于 Spark 的内部结构,下限和上限值被划分为数字范围和multiple queries are thrown to DB,以在每个查询中获取单个分区的数据。这也意味着在相关列上进行表分区或在源数据库中具有适当的索引对性能非常重要。

    您需要确保将上限和下限的占位符正确放置在您提供的查询中。作为提醒;实际数值可能因使用的数据库系统而异。如果出现这种情况,即数据库系统与日期的整数转换不同,那么您将需要提供数据库而不是 Spark 接受的值。来自same docs

    参数: connectionFactory - 返回打开连接的工厂。 RDD 负责关闭连接。 sql - 查询的文本。查询必须包含两个?用于划分结果的参数的占位符。为了 例如,

    select title, author from books where ? <= id and id <= ?
       
    
    lowerBound - the minimum value of the first placeholder
    upperBound - the maximum value of the second placeholder The lower and upper bounds are inclusive.
    

    ...

    同样,很明显 = 被利用了,所以上限和下限都包含在内;我在其他问题上观察到的一个困惑点。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-09-24
      • 1970-01-01
      • 1970-01-01
      • 2017-04-26
      • 1970-01-01
      • 1970-01-01
      • 2017-07-08
      • 2021-02-05
      相关资源
      最近更新 更多