【发布时间】:2019-05-02 18:21:01
【问题描述】:
(我是 Spark 新手)我需要存储大量数据行,然后处理这些数据的更新。我们为这些行提供了唯一的 ID (DB PK),我们希望通过 uniqueID % numShards 对数据集进行分片,以创建大小相等的可寻址分区。由于 PK(唯一 ID)存在于数据和更新文件中,因此很容易确定将更新哪个分区。我们打算按照相同的标准对数据和更新进行分片,并定期重写“分片 S + 为分片 S 累积的所有更新 => 新分片 S”。 (我们知道如何组合 shard S + updates = new shard S。)
如果这是我们的设计,我们需要 (1) 将 DataFrame 按其列之一(例如:K 列)分片到 |range(K)| 分区中,以保证分区中的所有行具有相同的值在 K 列和 (2) 中能够找到对应于 column_K=k 的 Parquet 文件,知道k = row.uniqueID % numShards。
这是一个好的设计,还是 Spark 提供了一些开箱即用的东西,让我们的任务变得更容易?
我们应该使用哪个 Spark 类/方法来对数据进行分区?我们正在查看RangePartitioner,但构造函数正在询问分区数。我们要指定“使用 column_K 进行分区,并为每个不同的值创建一个分区 k in range(K)”,因为我们已经创建了 column_K = uniqueID % numShards。哪个分区器适合拆分DataFrame 的一列的值?我们需要创建自定义分区器,还是使用partitionBy,或repartitionByRange,还是...?
这是我们目前所拥有的:
import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")
现在我们需要指定这个DataFrame 应该被SHARD_ID 分区,然后才能写成 Parquet 文件。
【问题讨论】:
标签: scala apache-spark sharding data-partitioning