【问题标题】:Load data with where clause in spark dataframe在 spark 数据框中使用 where 子句加载数据
【发布时间】:2020-04-23 12:07:42
【问题描述】:

我有一个包含 n 条记录的 oracle 表,现在我想使用 where/filter 条件从该表加载数据以触发数据帧。我不想将完整的数据加载到数据框中,然后对其应用过滤器。 spark.read.format("jdbc")...etc 或任何其他解决方案中是否有任何选项?

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

检查下面的代码。您可以在查询变量中编写自己的查询。要并行处理或加载数据,您可以检查 partitionColumn、lowerBound 和 upperBound 列。

val query = """
  (select columnA,columnB from table_name
    where <where conditions>) table
"""  
val options = Map(
    "url"              -> "<url>".
    "driver"           -> "<driver class>".
    "user"             -> "<user>".
    "password"         -> "<password>".
    "dbtable"          -> query,
    "partitionColumn"  -> "",
    "lowerBound"       -> "<lower bound values>", 
    "upperBound"       -> "<upper bound values>"
)

val df = spark
        .read
        .format("jdbc")
        .options(options)
        .load()

【讨论】:

  • 我认为SQL应该有(select in parenthesis) AS table
【解决方案2】:

试试这个

val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate  > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()

它将启用 where 条件以及分区

【讨论】:

    【解决方案3】:

    Spark 确实支持 JDBC 源的谓词下推。

    您可以简单地使用spark.read.format("jdbc") 加载数据帧并在该df 之上使用.where() 运行过滤器,然后您可以检查是否应用了spark SQL 谓词下推。

    在 SparkSQL 中,您可以看到针对 db 运行的确切查询,并且您会发现添加了 WHERE 子句。

    所以你不需要为它添加任何额外的东西。

    更多详情请参考databrickshttps://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization这篇文章

    【讨论】:

      【解决方案4】:

      您可以为此用例使用以下选项。参考link

          jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
      connectionProperties = {
        "user" : jdbcUsername,
        "password" : jdbcPassword,
        "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      }
      

      根据查询条件创建数据框:

      pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
      df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
      display(df)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-03-18
        • 2020-12-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多