【发布时间】:2021-06-14 17:53:44
【问题描述】:
为了在 spark 中加载和分区传入的数据,我使用以下语法。
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
参数partitionColumn、lowerBound、upperBound、numPartitions用于优化作业的性能。
我有一个包含 1000 条记录的表和一个整数列,其中的序列号从 1 到 1000。
我首先在该列上运行min 和max,以将min 值分配给lowerBound,将max 值分配给upperBound。 numPartitions 参数指定为 3,以便将传入数据均匀(或接近均匀)分成 3 个不同的分区。
上述设计在数据较少的情况下效果很好。但我有一个场景如下。
我有一个包含 2030 亿条记录的表,其中没有包含唯一/序列整数的整数列。然后有一个日期列,其数据分布在 5 年,即 2016-2021 年。 为了更快地移动数据,我每次都在移动每年一个月的数据。 这是我正在使用的查询:
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
所以上面的查询变成了:
select * from table where date_column >= '2016-01-01' and date_time <= '2016-01-31 23:59:59.999''
以此类推,每年每个月的第一天和最后一天。
这是对我的循环方式的粗略描述:
(2016 to 2021) { year =>
(1 to 12) { month =>
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
}
}
为了找出界限,我使用了相同的月份和年份过滤器,如下所示:
val bounds = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
.load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)
问题在于找到过滤数据的下限和上限。 由于数据量巨大,使用给定过滤器在 partitionColumn 上运行 min 和 max 的查询比将实际数据帧写入 hdfs 所花费的时间更多。
我尝试在那里给出随机值,但在任务运行时观察到分区中的数据倾斜。
是否必须将 partitionColumn 的 min 和 max 作为下限和上限以更好地分布数据? 如果没有,有没有办法指定下限和上限,而不是对数据运行最小和最大查询?
非常感谢任何帮助。
【问题讨论】:
标签: apache-spark apache-spark-sql