【问题标题】:Read from sql database in parallel using spark without knowing upper bound在不知道上限的情况下使用 spark 从 sql 数据库中并行读取
【发布时间】:2017-07-11 04:04:02
【问题描述】:

Spark 允许您从 sql db 源并行读取,并且可以基于滑动窗口进行分区,例如(来自book,第 7 章)

 val colName = "count"
 val lowerBound = 0L
 val upperBound = 348113L // this is the max count in our table
 val numPartitions = 10

 spark.read.jdbc(url,
                tablename,
                colName,
                lowerBound,
                upperBound,
                numPartitions,
                props).count()

在这里,上限是事先知道的。

比方说,一张表在一天内获得了“x”行(可能在 1-2 百万之间),在一天结束时,我们提交了一个 spark 作业,进行一些转换并写入 Parquet/ CSV/JSON。如果我们事先不知道将有多少行(从 1 到 2 百万不等)写入 SQL 源数据库,那么在这种情况下,进行分区的最佳方法或做法是什么。

一种方法是估计您的上限,但我不确定这是一种正确的方法。

【问题讨论】:

    标签: mysql apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    我遇到了完全相同的问题。我也需要随机分布。所以我选择一个 int 列并在其上获取 mod 10。这样我就可以在不关心的情况下分割任何东西 界限或分布。

    options += ("numPartitions" -> numPartitions,"partitionColumn"->"mod(my_int_column,10)","lowerBound"->"0","upperBound"->"9")
    

    【讨论】:

    • 此解决方案在 Spark 2.4.0 中不再适用,因为 partitionColumn 现在已解析以查明它们是否存在于架构中 (github.com/apache/spark/commit/…)。作为替代,您可以使用predicates 参数(spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/…)。生成这样的谓词:Array.range(0, partitionCount).map(partNumber => s"mod(my_int_column, $partitionCount) = $partNumber").
    • 关于 Spark 2.4,您可以将此列添加为您正在执行的查询的一部分:Select mod(id,10) as partition_key,* from table,然后将用户 partition_key 作为分区列
    【解决方案2】:

    每天的行数都不同这一事实并没有太大变化。假设您想要有 20 个分区,那么有一天您在单个分区中大约有 1M/20 行,而另一天大约有 2M/20。如果有更多的数据并且分区的数量是固定的,那么显然分区中会有更多的数据。

    如果对下限和上限有混淆,我想澄清一下,lowerBound 和 upperBound 指的是您要分区的列的值,而不是行数。此外,表格不会根据这些值进行过滤,这意味着如果您的行的值小于 lowerBound 或大于 upperBound,这些行仍将被包括在内。

    见: Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?

    来自文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

    partitionColumn、lowerBound、upperBound:如果指定了这些选项中的任何一个,则必须全部指定。此外,必须指定 numPartitions。他们描述了从多个工作人员并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字列。请注意,lowerBound 和 upperBound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。此选项仅适用于阅读。

    【讨论】:

    • 是的,我明白分区是基于列的值(这就是传递列名的原因)。但不是做滑动窗口分区,而是 spark 可以为 JDBC 源做自动分区(就像它为 CSV/Parquet 做的那样)?
    • 您可以不指定 partitionColumn、lowerBound、upperBound 和 numPartitions。然后 spark 将打开一个到您的 sql server 的单个 jdbc 连接,并按照它认为合适的方式进行分区。所以它几乎肯定是一个性能明显差的解决方案(但在你的 sql 服务器上更容易)。在我给出的文档链接中,有关于通过 jdbc 查询的 sn-ps 而不指定 partitionColumn 和边界。
    • 不给出任何界限,spark 将创建 1 个分区(使用 df.rdd.partition.size 检查),这无济于事,因为 spark 不会自行进行任何类型的分区。
    • @ds_user 该问题中的回答者写道,以回应未包含所有实际运行的查询的其他回答者。因此,该问题中公认的答案和我说的是同一件事 - 无论您选择什么边界,您都不会丢失任何数据。
    • 他说“实际上上面的列表遗漏了一些东西,特别是第一个和最后一个查询。没有它们你会丢失一些数据”。但他确实包含了第一个和最后一个查询,因此不会丢失任何数据。如果您有“SELECT * FROM table WHERE partitionColumn > 9000”,您将如何丢失超出上限的数据?
    猜你喜欢
    • 1970-01-01
    • 2017-01-12
    • 1970-01-01
    • 2015-08-30
    • 2015-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多