【问题标题】:When to cache a DataFrame?何时缓存 DataFrame?
【发布时间】:2025-12-24 00:30:07
【问题描述】:

我的问题是,我应该什么时候做 dataframe.cache() 什么时候有用?

另外,我应该在我的代码中缓存注释行中的数据帧吗?

注意:我的数据帧是从 Redshift 数据库加载的。

非常感谢

这是我的代码:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe

【问题讨论】:

  • 由于您没有调用多个操作,我认为您不应该缓存这些数据帧。

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

在 Spark 中缓存 RDD:这是一种加速应用程序多次访问同一个 RDD 的机制。每次在该 RDD 上调用操作时,都会重新评估未缓存或检查点的 RDD。缓存 RDD 有两个函数调用:cache()persist(level: StorageLevel)。它们之间的区别在于cache()会将RDD缓存到内存中,而persist(level)可以根据级别指定的缓存策略缓存在内存、磁盘或堆外内存中。 不带参数的persist() 等同于cache()。我们将在本文后面讨论缓存策略。从 Storage 内存中释放空间由 unpersist() 执行。

何时使用缓存:正如本文所建议的,建议在以下情况下使用缓存:

  • 在迭代机器学习应用程序中重用 RDD
  • RDD 在独立 Spark 应用程序中的重用
  • 当 RDD 计算成本很高时,缓存可以帮助减少 一名执行人失败时的恢复成本

【讨论】:

  • “一个没有缓存也没有检查点的 RDD,每次在该 RDD 上调用一个动作时都会重新评估”——整个主题中最重要的一句话。
【解决方案2】:

实际上在你的情况下.cache() 根本没有帮助。 您没有对您的(至少不在您提供的函数中)数据框执行任何操作。如果您将多次使用数据,.cache() 是个好主意:

data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()

这样您将只获取一次数据(当第一个操作称为.show(),然后下一次使用data 数据帧时应该更快。但是,请谨慎使用 - 有时再次获取数据仍然更快。 另外,我建议不要一遍又一遍地为您的数据框命名相同的名称。毕竟,数据框是不可变的对象。

希望这会有所帮助。

【讨论】:

    【解决方案3】:

    什么时候应该做 dataframe.cache() 什么时候有用?

    cache 您将在查询中使用的内容(以及早期且通常取决于可用内存)。您使用哪种编程语言(Python 或 Scala 或 Java 或 SQL 或 R)并不重要,因为底层机制是相同的。

    您可以使用explain 运算符(其中InMemoryRelation 实体反映缓存的数据集及其存储级别)查看您的物理计划中是否缓存了DataFrame:

    == Physical Plan ==
    *Project [id#0L, id#0L AS newId#16L]
    +- InMemoryTableScan [id#0L]
          +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *Range (0, 1, step=1, splits=Some(8))
    

    在您 cache(或 persist)您的 DataFrame 之后,第一个查询可能会变慢,但它会为以下查询带来回报。

    您可以使用以下代码检查数据集是否被缓存:

    scala> :type q2
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    
    val cache = spark.sharedState.cacheManager
    scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
    res0: Boolean = false
    

    另外,我应该在我的代码中缓存注释行中的数据帧吗?

    是和不是。缓存代表外部数据集的内容,这样您就不必在每次查询它们时支付通过网络传输数据(同时访问外部存储)的额外费用。

    不要缓存只使用一次或易于计算的内容。否则,cache


    注意缓存的内容,即缓存的 Dataset,因为它会缓存不同的查询。

    // cache after range(5)
    val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
    scala> q1.explain
    == Physical Plan ==
    *Filter ((id#0L % 2) = 0)
    +- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
          +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *Range (0, 5, step=1, splits=8)
    
    // cache at the end
    val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
    scala> q2.explain
    == Physical Plan ==
    InMemoryTableScan [id#17L]
       +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
             +- *Filter ((id#17L % 2) = 0)
                +- *Range (0, 1, step=1, splits=8)
    

    Spark SQL 中的缓存有一个惊喜。缓存是惰性的,这就是为什么你要付出额外的代价来缓存第一个动作的行,但这只会发生在 DataFrame API 上。在 SQL 中,缓存是急切的,这会对查询性能产生巨大影响,因为您无需调用操作来触发缓存。

    【讨论】:

    • 即使我们在同一个 Spark 作业中多次重用数据帧/rdd,缓存是否有意义?我知道,如果所做的计算非常复杂,那么我们可以缓存数据帧以避免重新计算,以防执行程序死亡,即使我只使用了一次特定的数据帧。但在其他情况下,cahce 在同一个作业中多次使用的数据帧是否有意义?
    • @ravimalhotra 缓存数据集,除非您知道这是浪费时间 :) 换句话说,始终缓存在同一个作业中多次使用的数据帧。