【问题标题】:Can we give a Sequence Id for rows in Spark?我们可以为 Spark 中的行提供序列 ID 吗?
【发布时间】:2020-05-28 13:33:21
【问题描述】:

我是 spark 新手,我在数据文件中有大约 10000 行要读取

SparkSession sessionSpark = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();


Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/")

我有一个用例为数据集中的每一行添加一个行号,行号应该从 1 到 10000 开始(因为文件有 10000 条记录),是否可以分配一个行号,我们知道 spark shuffles数据,但假设即使从应用程序重新运行同一个文件两次,生成的行号应该是相同的,有可能吗?

【问题讨论】:

  • 如果你使用 monotonically_increasing_id 函数,它有一些缺点以及如何解决这个问题..查看这篇文章 - stackoverflow.com/questions/48209667/…
  • 我会在 scala 中发布更完整的答案
  • 可能是一个接受答案的想法。

标签: java apache-spark


【解决方案1】:

monotonically_increasing_id() 将为您的行添加增量 ID

import org.apache.spark.sql.functions._
Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/").withColumn("rowNum", monotonically_increasing_id())

来自官方 Spark 文档

生成单调递增 64 位的列表达式 整数。

生成的 ID 保证单调递增且 唯一的,但不是连续的。当前的实现将 高 31 位的分区 ID,以及每个分区内的记录号 分区在低 33 位。假设是数据框 少于10亿个分区,每个分区少于8个 十亿条记录。

例如,考虑一个有两个分区的DataFrame,每个分区有 3 个 记录。此表达式将返回以下 ID:

{{{ 0, 1, 2, 8589934592 (1L

【讨论】:

  • 您是否 100% 确定这会使 ID 始终保持不变?像 ID 1 将永远是记录 X?
  • 那么你引用的意思是,如果我改变分区的数量,我会得到不同的ID?我自己对此很感兴趣,因为这是我拥有的一个用例,并且我使用哈希来 100% 确定它是相同的。
  • no 引号意味着 id 将是唯一的编号。对于任何数据框/数据集中的每一行。它只是跨分区分配唯一的 id 集,以加快计算速度。
  • 酷。我会亲自使用它,你应该得到 +1,但我认为这并不能满足要求。它不是 1 到 1000,连续的。它是“随机的”。
  • 对我得到的 -1 票感到恼火,这确实是 zipWithIndex 回答了你的问题。
【解决方案2】:

EDIT:符合ID连续且从1开始的解决方案

如果您可以通过某种方式订购它们,那应该是可能的。这个例子可能是scala,但主要部分还是SQL部分。

val df = sc.parallelize(Seq(("alfa", 10), ("beta", 20), ("gama", 5))).toDF("word", "count")
df.createOrReplaceTempView("wordcount")

// MAIN PART
val tmpTable = spark.sqlContext.sql("select row_number() over (order by count) as index,word,count from wordcount")

tmpTable.show()

+-----+----+-----+
|index|word|count|
+-----+----+-----+
|    1|gama|    5|
|    2|alfa|   10|
|    3|beta|   20|
+-----+----+-----+

编辑:如果您不需要纯数字,请使用行哈希。这样更好。

【讨论】:

  • 如果没有 partition by 子句,这不会将所有数据拉到一个可能导致性能问题的分区吗?我通过在 WindowSpec 中添加一个虚拟 partitionBy(lit(0)) 子句解决了使用 dense_rank 时的特定问题
【解决方案3】:

不是在 Java 中,因为我不专攻那个,而是在 Scala 中。应该很容易为您转换。只是我在案例类中使用 DS 的一个例子:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, Encoders}
import spark.implicits._

// Gen some example data via DF, can come from files, ordering in those files assumed. I.e. no need to sort.
val df = Seq(
  ("1 February"), ("n"), ("c"), ("b"), 
  ("2 February"), ("hh"), ("www"), ("e"), 
  ("3 February"), ("y"), ("s"), ("j"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("2 March"), ("c"), ("b"), ("x"),
  ("3 March"), ("c"), ("b"), ("x"), ("y"), ("z")
           ).toDF("line")

// Define Case Classes to avoid Row aspects on df --> rdd --> to DF. 
case class X(line: String)   
case class Xtra(key: Long, line: String)

// Add the Seq Num using zipWithIndex. Then convert back, but will have a struct to deal wit.
// You can avoid the struct if using Row and such. But general idea should be clear.
val rdd = df.as[X].rdd.zipWithIndex().map{case (v,k) => (k,v)}
val ds = rdd.toDF("key", "line").as[Xtra]
ds.show(100,false)

返回:

+---+------------+
|key|line        |
+---+------------+
|0  |[1 February]|
|1  |[n]         |  
|2  |[c]         |
...

迄今为止的答案不满足问题提供的需求,但如果只有 10K 行,那么单个分区不是问题。尽管对于 10K 行,人们必须提出几个问题。

如果你不介意 Row,这里有另一种方法:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

【讨论】:

  • @thebluephanton zipWithIndex 如果我们多次运行同一个文件,是否保证相同的顺序..例如:第一次运行的 ID 在第二次运行中是否也总是相同的 ID?
  • 假设内容相同,并且没有重新生成,是的。
  • 而且还没有洗牌
  • zipwithinded 是 rdds still.zwi 的一个原因是窄转换。读取文件具有理解序列的块、分区指针
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-06-07
  • 1970-01-01
  • 2020-05-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多