【问题标题】:Spark Partition Dataset By Column ValueSpark 按列值对数据集进行分区
【发布时间】:2019-05-02 18:21:01
【问题描述】:

(我是 Spark 新手)我需要存储大量数据行,然后处理这些数据的更新。我们为这些行提供了唯一的 ID (DB PK),我们希望通过 uniqueID % numShards 对数据集进行分片,以创建大小相等的可寻址分区。由于 PK(唯一 ID)存在于数据和更新文件中,因此很容易确定将更新哪个分区。我们打算按照相同的标准对数据和更新进行分片,并定期重写“分片 S + 为分片 S 累积的所有更新 => 新分片 S”。 (我们知道如何组合 shard S + updates = new shard S。)

如果这是我们的设计,我们需要 (1) 将 DataFrame 按其列之一(例如:K 列)分片到 |range(K)| 分区中,以保证分区中的所有行具有相同的值在 K 列和 (2) 中能够找到对应于 column_K=k 的 Parquet 文件,知道k = row.uniqueID % numShards

这是一个好的设计,还是 Spark 提供了一些开箱即用的东西,让我们的任务变得更容易?

我们应该使用哪个 Spark 类/方法来对数据进行分区?我们正在查看RangePartitioner,但构造函数正在询问分区数。我们要指定“使用 column_K 进行分区,并为每个不同的值创建一个分区 k in range(K)”,因为我们已经创建了 column_K = uniqueID % numShards。哪个分区器适合拆分DataFrame 的一列的值?我们需要创建自定义分区器,还是使用partitionBy,或repartitionByRange,还是...?

这是我们目前所拥有的:

import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")

现在我们需要指定这个DataFrame 应该被SHARD_ID 分区,然后才能写成 Parquet 文件。

【问题讨论】:

    标签: scala apache-spark sharding data-partitioning


    【解决方案1】:

    这行得通:

    val df = spark.read
    .option("fetchsize", 1000)
    .option("driver", "oracle.jdbc.driver.OracleDriver")
    .jdbc(jdbc.getString("url"), "SCHEMA.TABLE_NAME", partitions, props)
    .withColumn("SHARD_ID", col("TABLE_PK") % 1024)
    .write
    .partitionBy("SHARD_ID")
    .parquet("parquet/table_name")
    

    【讨论】:

      猜你喜欢
      • 2021-02-05
      • 1970-01-01
      • 1970-01-01
      • 2017-07-05
      • 2016-10-15
      • 2018-06-18
      • 1970-01-01
      • 2019-03-13
      • 2021-12-09
      相关资源
      最近更新 更多