【发布时间】:2019-01-07 00:21:26
【问题描述】:
我们使用的是 Spark 2.2.0。我们在 hive 表中有 1.5 TB 的数据。我们有 80 个节点集群,每个节点有大约 512 GB RAM 和 40 个内核。
我正在使用 Spark SQL 访问这些数据。使用普通的 Spark SQL(没有缓存)简单的命令,比如获取特定列值的不同计数大约需要 13 秒。但是当我在缓存表后运行相同的命令时,需要 10 多分钟。不知道是什么问题?
export SPARK_MAJOR_VERSION=2
spark-shell --master yarn --num-executors 40 --driver-memory 5g --executor-memory 100g --executor-cores 5
spark.conf.set("spark.sql.shuffle.partitions", 10)
val df = spark.sql("select * from analyticalprofiles.customer_v2")
df.createOrReplaceTempView("tmp")
spark.time(spark.sql("select count(distinct(household_number)) from tmp").show())
>> Time taken: 13927 ms
import org.apache.spark.storage.StorageLevel
val df2 = df.persist(StorageLevel.MEMORY_ONLY)
df2.createOrReplaceTempView("tmp2")
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 1037482 ms ==> FIRST TIME - okay if this is more
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 834740 ms ==> SECOND TIME - Was expecting much faster execution ???
用“spark.catalog.cacheTable("tmp")”尝试了同样的事情,但缓存查询仍然需要更多时间。不知道为什么???有人可以帮忙吗???
df2.storageLevel.useMemory
res6: Boolean = true
sc.getPersistentRDDs
res8: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(12 -> In-memory table tmp MapPartitionsRDD[12] at cacheTable at <console>:24)
spark.conf.get("spark.sql.inMemoryColumnarStorage.compressed")
res11: String = true
spark.conf.get("spark.sql.inMemoryColumnarStorage.batchSize")
res12: String = 10000
spark.catalog.isCached("tmp")
res13: Boolean = true
【问题讨论】:
-
你能检查分区数吗?我想检查它是否从内存溢出并试图将其写入磁盘?如果是这样,所花费的时间可能是由于涉及写入的 I/O。
-
似乎很多人都面临着这样的问题。不保证可以保留所有内容,而且我假设集群是共享资源。
标签: apache-spark hadoop dataframe caching