【问题标题】:Spark RDD and Dataframe transformation optimisationSpark RDD 和 Dataframe 转换优化
【发布时间】:2020-11-29 06:32:57
【问题描述】:

我是 Spark 的新手,并且有以下关于 RDD 和 Dataframes 的高级问题,如果我没记错的话,它们是建立在 RDDs 之上的:

我了解可以对 RDD 执行两种类型的操作,即转换和操作。我也明白,只有在对作为该转换产品的 RDD 执行操作时,才会执行转换。鉴于 RDD 在内存中,我想知道是否有可能优化这些 RDD 消耗的内存量,举个例子:

KafkaDF = KafkaDFRaw.select(
        KafkaDFRaw.key,
        KafkaDFRaw.value,
        KafkaDFRaw.topic,
        unix_timestamp('timestamp',
                       'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
    ).withColumn("spark_arrival_time", udf(time.time, DoubleType())())

我有一个 KafkaDFRaw 数据帧,并生成了一个名为 KafkaDF 的新 RDD。然后我希望将列添加到这个新的 RDD。我应该将它们添加到现有的 RDD 中吗?像这样:

decoded_value_udf = udf(lambda value: value.decode("utf-8"))
    KafkaDF = KafkaDF\
        .withColumn(
            "cleanKey", decoded_value_udf(KafkaDF.key))\
        .withColumn(
            "cleanValue", decoded_value_udf(KafkaDF.value))

或者我应该从上一个数据框创建一个新的数据框?像这样:

decoded_value_udf = udf(lambda value: value.decode("utf-8"))
    KafkaDF_NEW = KafkaDF\
        .withColumn(
            "cleanKey", decoded_value_udf(KafkaDF.key))\
        .withColumn(
            "cleanValue", decoded_value_udf(KafkaDF.value))

这对内存优化有影响吗?

提前感谢您的帮助。

【问题讨论】:

  • 您的两个选项是相同的。 RDD 和数据帧是不可变的,第一个选项只是将新的 rdd 重新分配给同一个变量。另一个注意事项 - 您不能在任务中创建新的 rdd,因此如果您打算分发 udf,那么它就不会起作用。
  • @mazaneicha。感谢您的回复和反馈。我明白你在说什么,但是你将如何分发它?
  • 新的rdds /dataframes只能在定义了SparkContext的驱动上创建。

标签: python apache-spark apache-spark-sql rdd


【解决方案1】:

每当调用该操作时,都会执行优化的 dag 并按照计划使用内存。 你可以对比执行计划来了解:

df.explain(true)
df_new.explain(true)

在两者之间创建额外的变量来保存转换不会影响内存利用率。内存需求取决于数据大小、分区大小、shuffle 等。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-05
    • 2017-06-02
    • 1970-01-01
    • 2017-06-13
    • 2018-10-21
    • 1970-01-01
    • 2016-05-12
    • 2019-05-14
    相关资源
    最近更新 更多