【发布时间】:2020-04-23 12:07:42
【问题描述】:
我有一个包含 n 条记录的 oracle 表,现在我想使用 where/filter 条件从该表加载数据以触发数据帧。我不想将完整的数据加载到数据框中,然后对其应用过滤器。 spark.read.format("jdbc")...etc 或任何其他解决方案中是否有任何选项?
【问题讨论】:
标签: scala apache-spark
我有一个包含 n 条记录的 oracle 表,现在我想使用 where/filter 条件从该表加载数据以触发数据帧。我不想将完整的数据加载到数据框中,然后对其应用过滤器。 spark.read.format("jdbc")...etc 或任何其他解决方案中是否有任何选项?
【问题讨论】:
标签: scala apache-spark
检查下面的代码。您可以在查询变量中编写自己的查询。要并行处理或加载数据,您可以检查 partitionColumn、lowerBound 和 upperBound 列。
val query = """
(select columnA,columnB from table_name
where <where conditions>) table
"""
val options = Map(
"url" -> "<url>".
"driver" -> "<driver class>".
"user" -> "<user>".
"password" -> "<password>".
"dbtable" -> query,
"partitionColumn" -> "",
"lowerBound" -> "<lower bound values>",
"upperBound" -> "<upper bound values>"
)
val df = spark
.read
.format("jdbc")
.options(options)
.load()
【讨论】:
(select in parenthesis) AS table。
试试这个
val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()
它将启用 where 条件以及分区
【讨论】:
Spark 确实支持 JDBC 源的谓词下推。
您可以简单地使用spark.read.format("jdbc") 加载数据帧并在该df 之上使用.where() 运行过滤器,然后您可以检查是否应用了spark SQL 谓词下推。
在 SparkSQL 中,您可以看到针对 db 运行的确切查询,并且您会发现添加了 WHERE 子句。
所以你不需要为它添加任何额外的东西。
更多详情请参考databrickshttps://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization这篇文章
【讨论】:
您可以为此用例使用以下选项。参考link
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
根据查询条件创建数据框:
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)
【讨论】: