【问题标题】:spark, scala & jdbc - how to limit number of recordsspark, scala & jdbc - 如何限制记录数
【发布时间】:2018-04-09 15:24:29
【问题描述】:

有没有办法限制使用 spark sql 2.2.0 从 jdbc 源获取的记录数?

我正在处理将大量 >200M 记录从一个 MS Sql Server 表移动(和转换)到另一个的任务:

val spark = SparkSession
    .builder()
    .appName("co.smith.copydata")
    .getOrCreate()

val sourceData = spark
    .read
    .format("jdbc")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("url", jdbcSqlConnStr)
    .option("dbtable", sourceTableName)
    .load()
    .take(limit)

虽然它有效,但它显然首先从数据库中加载所有 200M 记录,首先花费了 18 分钟的时间,然后返回我希望用于测试和开发目的的有限数量的记录。

在 take(...) 和 load() 之间切换会产生编译错误。

我很欣赏可以将示例数据复制到较小的表中、使用 SSIS 或其他 etl 工具的方法。

我真的很好奇是否有办法使用 spark、sql 和 jdbc 来实现我的目标。

【问题讨论】:

    标签: sql scala apache-spark jdbc


    【解决方案1】:

    为了限制下载的行数,可以使用 SQL 查询来代替“dbtable”中的表名。 Description in documentation.

    在查询中可以指定“where”条件,例如,使用特定于服务器的功能来限制行数(如 Oracle 中的“rownum”)。

    【讨论】:

    • 我尝试了两种使用 sql 语句而不是表名的方法,并得到一个错误 'invalid syntax near select...'。我尝试使用 val sqlDF = spark.sql("...") 方法选择前 100 条记录,但出现错误“无效 spark sql 语法”或类似的错误。
    • 查询括号必须使用别名,请看:stackoverflow.com/questions/43174838/…
    • 太棒了!以下作品! //源数据 val jdbcSourceConnection = s"jdbc:sqlserver://$sourceDbHost;databaseName=$sourceDbName;user=$sourceDbUsername;password=$sourceDbPassword;" val sourceData = spark .read .format("jdbc") .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("url", jdbcSourceConnection) // .option("dbtable", sourceTableName) .option("dbtable", "(select top 100 * from Customer.Preference) as table1") .load .sort("CustomerID")
    • 虽然这有效,但您可以使用限制完成类似的操作 - 请参阅我的答案。
    【解决方案2】:

    我没有对此进行测试,但您应该尝试使用limit 而不是taketake 调用 head 在封面下有以下注释:

    只有在预期结果数组的情况下才应使用此方法 很小,因为所有数据都加载到驱动程序的内存中。

    limit 导致 LIMIT 被推入 sql 查询,因为它是一个惰性求值:

    这个函数和head的区别在于head是一个动作,返回一个数组(通过触发查询执行),而limit返回一个新的Dataset。

    如果您想要数据而不先将其拉入,那么您甚至可以执行以下操作:

    ...load.limit(limitNum).take(limitNum)
    

    【讨论】:

    • 我已经使用 Cassandra 作为数据源的确切方法,但对于 Sql Server jdbc,它对我不起作用。也许我应该进一步研究一下,谢谢你的建议!
    • 不:加载我要求的 100 条记录需要很长时间。在这种情况下,火花似乎首先加载所有记录,然后再应用限制。 ``` val sourceData = spark .read .format("jdbc") .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("url", jdbcSourceConnection) // .option(" dbtable", s"(select top ${config.sourceLimit} * from ${config.sourceTableName}) as table1") .option("dbtable", config.sourceTableName) .load .limit(config.sourceLimit.toInt) 。排序(“客户 ID”)```
    【解决方案3】:

    这种方法对于关系数据库来说有点糟糕。 spark的加载函数会请求你的完整表,存储在内存/磁盘中,然后会做RDD转换和执行。

    如果您想做探索性工作,我建议您在第一次加载时存储这些数据。有几种方法可以做到这一点。获取您的代码并执行以下操作:

    val sourceData = spark
        .read
        .format("jdbc")
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("url", jdbcSqlConnStr)
        .option("dbtable", sourceTableName)
        .load()
    sourceData.write
        .option("header", "true")
        .option("delimiter", ",")
        .format("csv")
        .save("your_path")
    

    这将允许您将本地计算机中的数据保存为 CSV,这是您可以使用任何语言进行探索的最常见格式。 每次要加载此文件时,请从此文件中获取此数据。如果您想要实时分析或任何其他类似的东西。我建议您使用数据转换构建一个管道以更新另一个存储。每次使用这种方法处理从数据库加载的数据并不好。

    【讨论】:

    • 感谢您的详细评论!
    猜你喜欢
    • 2021-04-19
    • 2021-12-22
    • 2016-02-22
    • 2018-02-02
    • 2018-07-16
    • 2022-07-21
    • 2015-05-26
    • 2017-05-26
    • 2020-12-17
    相关资源
    最近更新 更多