【发布时间】:2018-07-30 13:26:11
【问题描述】:
我在处理 spark 宽数据帧(大约 9000 列,有时更多)时遇到了问题。
任务:
- 通过 groupBy 和 pivot 创建宽 DF。
- 将列转换为向量并从 pyspark.ml 处理为 KMeans。
所以我制作了扩展框架并尝试使用 VectorAssembler 创建向量,将其缓存并对其进行 KMeans 训练。
在我的电脑上,在独立模式下,对于大约 500x9000 的帧,组装 7 个不同数量的集群大约需要 11 分钟,而 KMeans 需要 2 分钟。另一方面,pandas 中的这种处理(pivot df,并迭代 7 个集群)只需不到一分钟。
显然我理解独立模式和缓存等的开销和性能下降,但这真的让我气馁。
有人可以解释我如何避免这种开销吗?
人们如何使用宽 DF 而不是使用矢量汇编程序并降低性能?
更正式的问题(针对软规则)听起来像 - 我怎样才能加快这段代码的速度?
%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
.groupBy('User')
.pivot('ObjectPath')
.agg({'PropertyFlagValue':'max'})
.fillna(0))
ignore = ['User']
assembler = VectorAssembler(
inputCols=[x for x in tmp.columns if x not in ignore],
outputCol='features')
Wall time: 36.7 s
print(tmp.count(), len(tmp.columns))
552, 9378
%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s
%%time
lst_levels = []
for num in range(3, 14):
kmeans = KMeans(k=num, maxIter=50)
model = kmeans.fit(transformed)
lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
if i - j < j:
print(rs.index(i))
kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
model = kmeans.fit(transformed)
break
Wall time: 1min 32s
配置:
.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
【问题讨论】:
-
++ 我还想研究如何处理 Spark 中的多列数据帧(通过 R 的 sparklyr),特别是在处理经典情感分析时,文本单元是否编码在一个巨大的逻辑矩阵中.
-
在github上添加了一个与这个问题相关的问题:github.com/rstudio/sparklyr/issues/1322
标签: python pandas apache-spark machine-learning pyspark