所有这些选项是什么:spark.read.jdbc 是指从 RDBMS 中读取表。
并行性是火花的力量,为了实现这一点,您必须提及所有这些选项。
问题 :-)
1) 文档似乎表明这些字段是可选的。如果我不提供会怎样?
答案:默认并行度或并行度差
根据场景,开发人员必须注意性能调优策略。并确保数据跨边界(也称为分区)拆分,而这些边界又将是并行任务。这样看。
2) Spark 是如何知道如何对查询进行分区的?效率如何?
jdbc-reads -referring to databricks docs
您可以根据数据集的列值提供分割边界。
- 这些选项指定读取时的并行度。
- 如果指定了这些选项中的任何一个,则必须全部指定。
注意
这些选项指定表读取的并行度。 lowerBound 和 upperBound 决定分区步长,但不
过滤表中的行。因此,Spark 分区并返回所有
表格中的行。
示例 1:
您可以使用partitionColumn、lowerBound、upperBound 和numPartitions 参数在emp_no 列上拆分跨执行程序读取的表。
val df = spark.read.jdbc(url=jdbcUrl,
table="employees",
columnName="emp_no",
lowerBound=1L,
upperBound=100000L,
numPartitions=100,
connectionProperties=connectionProperties)
numPartitions 也表示您要求 RDBMS 读取数据的并行连接数。如果您提供 numPartitions,那么您将限制连接数......而不会耗尽 RDBMS 端的连接......
示例 2 来源:datastax presentation to load oracle data in cassandra:
val basePartitionedOracleData = sqlContext
.read
.format("jdbc")
.options(
Map[String, String](
"url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
"dbtable" -> "ExampleTable",
"lowerBound" -> "1",
"upperBound" -> "10000",
"numPartitions" -> "10",
"partitionColumn" -> “KeyColumn"
)
)
.load()
该映射中的最后四个参数用于获取分区数据集。如果你通过其中任何一个,
你必须通过所有这些。
当你传入这些额外的参数时,它的作用如下:
它构建格式为
的 SQL 语句模板
SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?
它将 {numPartitions} 语句发送到数据库引擎。如果您提供了这些值:{dbTable=ExampleTable,
lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn},它会创建这十个
声明:
SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.
问题 :-)
如果我指定了这些选项,我如何确保分区
即使 partitionColumn 不均匀,大小也大致相同
分散式?
我的第 1 位和第 20 位执行者会获得大部分工作,而其他
18个执行者坐在那里,大部分都是闲置的?
如果是这样,有没有办法防止这种情况发生?
所有问题都有一个答案
下面是方法...
1)您需要了解每个分区有多少记录/行数......基于此您可以repartition或coalesce
片段 1:Spark 1.6 >
spark 2.x 提供了了解分区中有多少记录的工具。
spark_partition_id() 存在于org.apache.spark.sql.functions
import org.apache.spark.sql.functions._
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show
片段 2:适用于所有版本的 spark
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_number","NumberOfRecordsPerPartition")
.show
然后您需要再次合并您的策略在范围之间进行查询调整或重新分区等......,您可以使用 mappartitions 或 foreachpartitions
结论:我更喜欢使用适用于数字列的给定选项,因为我已经看到它将数据划分为统一的
边界/分区。
有时可能无法手动使用这些选项
需要调整分区/并行度...
更新:
With the below we can achive uniform distribution...
- 获取表的主键。
- 找出关键的最小值和最大值。
- 使用这些值执行 Spark。
def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
val min = result.getString(1).toInt
val max = result.getString(2).toInt
val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)
}