【问题标题】:Strange issue with spark caching火花缓存的奇怪问题
【发布时间】:2019-01-07 00:21:26
【问题描述】:

我们使用的是 Spark 2.2.0。我们在 hive 表中有 1.5 TB 的数据。我们有 80 个节点集群,每个节点有大约 512 GB RAM 和 40 个内核。

我正在使用 Spark SQL 访问这些数据。使用普通的 Spark SQL(没有缓存)简单的命令,比如获取特定列值的不同计数大约需要 13 秒。但是当我在缓存表后运行相同的命令时,需要 10 多分钟。不知道是什么问题?

export SPARK_MAJOR_VERSION=2
spark-shell --master yarn --num-executors 40 --driver-memory 5g --executor-memory 100g --executor-cores 5
spark.conf.set("spark.sql.shuffle.partitions", 10)
val df = spark.sql("select * from analyticalprofiles.customer_v2")
df.createOrReplaceTempView("tmp")
spark.time(spark.sql("select count(distinct(household_number)) from tmp").show())
>> Time taken: 13927 ms



import  org.apache.spark.storage.StorageLevel
val df2 = df.persist(StorageLevel.MEMORY_ONLY)
df2.createOrReplaceTempView("tmp2")
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 1037482 ms ==> FIRST TIME - okay if this is more
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 834740 ms  ==> SECOND TIME - Was expecting much faster execution ???

用“spark.catalog.cacheTable("tmp")”尝试了同样的事情,但缓存查询仍然需要更多时间。不知道为什么???有人可以帮忙吗???

df2.storageLevel.useMemory
res6: Boolean = true

sc.getPersistentRDDs
res8: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(12 -> In-memory table tmp MapPartitionsRDD[12] at cacheTable at <console>:24)

spark.conf.get("spark.sql.inMemoryColumnarStorage.compressed")
res11: String = true

spark.conf.get("spark.sql.inMemoryColumnarStorage.batchSize")
res12: String = 10000

spark.catalog.isCached("tmp")
res13: Boolean = true

【问题讨论】:

  • 你能检查分区数吗?我想检查它是否从内存溢出并试图将其写入磁盘?如果是这样,所花费的时间可能是由于涉及写入的 I/O。
  • 似乎很多人都面临着这样的问题。不保证可以保留所有内容,而且我假设集群是共享资源。

标签: apache-spark hadoop dataframe caching


【解决方案1】:

您可以尝试以下方法。

  1. 您可以使用以下公式增加执行器的数量并减少执行器的内存

      SPARK_EXECUTOR_CORES (--executor-cores) : 5 
    
      Number of Executors (--num-executors) : (number of nodes) *  (number of cores) /(executor cores) -1 (for Application Master) = (80*40)/5 ~ 640-1 = 639
    
      SPARK_EXECUTOR_MEMORY (--executor-memory): Memory/(Number of Executors/Number of Nodes):  512/(639/80) ~ 64 GB
    
  2. 如果要持久化数据帧,请使用 StorageLevel.MEMORY_AND_DISK_SER 。如果内存(RAM)已满,它将保存在磁盘中。

希望对你有帮助。

【讨论】:

  • 谢谢,我尝试了 MEMORY_ONLY 和 MEMORY_AND_DISK_SER。使用 MEMORY_ONLY 它只缓存 18% 的数据,但是当我使用 MEMORY_AND_DISK_SER 时,它缓存了 100% - 18% 在内存中并保留在磁盘上。但是我又发现了一个问题。我在磁盘上的数据大小约为 1.5 TB(以 Parquet 格式),但是当我使用 Sparks MEMORY_AND_DISK_SER 选项读取它时,它占用了大约 11 TB(内存中为 2 TB,磁盘中为 9 TB)。为什么会这样?我认为序列化的数据在缓存时会占用更少的空间?有什么办法可以压缩缓存的数据,或者至少让它在内存/磁盘中占用更少的空间?
猜你喜欢
  • 1970-01-01
  • 2020-07-18
  • 2021-04-04
  • 1970-01-01
  • 2012-08-30
  • 2013-01-11
  • 2016-08-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多