【问题标题】:Efficient way to add UUID in pyspark [duplicate]在pyspark中添加UUID的有效方法[重复]
【发布时间】:2020-06-23 16:08:15
【问题描述】:

我有一个 DataFrame,我想添加一列不同的 uuid4() 行。我的代码:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

from uuid import uuid4

spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [2, 2, 'teste'],
        [3, 0, 'teste'],
        [4, 5, 'teste'],
    ],
    list('abc'))


df = df.withColumn("_tmp", f.lit(1))

uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))


df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()

但我遇到了这个错误:

Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans

我已经尝试使用别名并使用 monotonically_increasing_id 作为连接列,但我看到了 here 我不能相信 monotonically_increasing_id 作为合并列。 我期待:

+---+---+-----+------+
|  a|  b|    c| value|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+

在这种情况下正确的方法是什么?

【问题讨论】:

  • 您可以将 uuid4() 注册为 udf 并在 spark 中调用它。如果你想坚持你的方法,请使用 row_number() 给你行号并加入它。
  • Spark 明智地warning 你,你的加入被识别为笛卡尔积。这是因为左侧数据集将匹配右侧数据集的任何列,从而创建笛卡尔积。确实这是意料之中的,因为您将 df 与 df1 连接在一个列上,这两个数据集的值相同,相当于 crossJoin。
  • @AlexandrosBiratsis,你推荐什么方法?
  • @Tetlanesh 与 row_number 一起工作!
  • 最简单的方法是在表达式中使用 spark sql 的 uuid 函数,像这样:df.withColumn("uuid", f.expr("uuid()"))

标签: python-3.x apache-spark pyspark


【解决方案1】:

我按照@Tetlanesh 的建议使用 row_number。我必须创建一个 ID 列以确保 row_number 计算 Window 的每一行。

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from uuid import uuid4
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

from pyspark.sql.functions import row_number


spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [1, 2, 'teste'],
        [2, 0, 'teste'],
        [2, 5, 'teste'],
    ],
    list('abc'))

df = df.alias("_tmp")
df.registerTempTable("_tmp")

df2 = self.spark_session.sql("select *, uuid() as uuid from _tmp")

df2.show()

另一种方法是使用windows,但效率不如第一种:


df = df.withColumn("_id", f.lit(1))
df = df.withColumn("_tmp", row_number().over(Window.orderBy('_id')))

uuids = [(str(uuid4()), 1) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, ['uuid', '_id'])
df1 = df1.withColumn("_tmp", row_number().over(Window.orderBy('_id')))


df2 = df.join(df1, "_tmp", "inner").drop('_id')

df2.show()

两个输出:

+---+---+-----+------+
|  a|  b|    c|  uuid|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+

【讨论】:

  • 你可以在窗口声明中跳过.partitionBy('_id'),因为_id全是1,它根本没有分区
  • 谢谢!在这里工作!我现在要编辑
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-21
  • 2020-08-09
  • 2018-02-09
  • 1970-01-01
  • 2020-04-13
  • 2021-08-21
相关资源
最近更新 更多