【问题标题】:Performance decrease for huge amount of columns. Pyspark大量列的性能下降。派斯帕克
【发布时间】:2018-07-30 13:26:11
【问题描述】:

我在处理 spark 宽数据帧(大约 9000 列,有时更多)时遇到了问题。
任务:

  1. 通过 groupBy 和 pivot 创建宽 DF。
  2. 将列转换为向量并从 pyspark.ml 处理为 KMeans。

所以我制作了扩展框架并尝试使用 VectorAssembler 创建向量,将其缓存并对其进行 KMeans 训练。
在我的电脑上,在独立模式下,对于大约 500x9000 的帧,组装 7 个不同数量的集群大约需要 11 分钟,而 KMeans 需要 2 分钟。另一方面,pandas 中的这种处理(pivot df,并迭代 7 个集群)只需不到一分钟。
显然我理解独立模式和缓存等的开销和性能下降,但这真的让我气馁。
有人可以解释我如何避免这种开销吗?
人们如何使用宽 DF 而不是使用矢量汇编程序并降低性能?
更正式的问题(针对软规则)听起来像 - 我怎样才能加快这段代码的速度?

%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
       .groupBy('User')
       .pivot('ObjectPath')
       .agg({'PropertyFlagValue':'max'})
       .fillna(0))
ignore = ['User']
assembler = VectorAssembler(
    inputCols=[x for x in tmp.columns if x not in ignore],
    outputCol='features')
Wall time: 36.7 s

print(tmp.count(), len(tmp.columns))
552, 9378

%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s

%%time
lst_levels = []
for num in range(3, 14):
    kmeans = KMeans(k=num, maxIter=50)
    model = kmeans.fit(transformed)
    lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
    if i - j < j:
        print(rs.index(i))
        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
        model = kmeans.fit(transformed)
        break
 Wall time: 1min 32s

配置:

.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \

【问题讨论】:

  • ++ 我还想研究如何处理 Spark 中的多列数据帧(通过 R 的 sparklyr),特别是在处理经典情感分析时,文本单元是否编码在一个巨大的逻辑矩阵中.
  • 在github上添加了一个与这个问题相关的问题:github.com/rstudio/sparklyr/issues/1322

标签: python pandas apache-spark machine-learning pyspark


【解决方案1】:

实际上在map 中找到了rdd 的解决方案。

  1. 首先我们要创建值映射。
  2. 同时提取所有不同的名称。
  3. 倒数第二步,我们在名称字典中搜索行映射的每个值,如果没有找到则返回值或 0。
  4. 矢量汇编器处理结果。

优点:

  1. 您不必创建具有大量列数的宽数据帧,从而避免开销。 (速度从 11 分钟提高到 1 分钟。)
  2. 您仍然在集群上工作并在 spark 范例中执行您的代码。

代码示例:scala implementation

【讨论】:

【解决方案2】:

VectorAssembler's transform function 处理所有列并在每个列上存储除原始数据之外的元数据。这需要时间,也占用 RAM。

要准确了解增加了多少,您可以将转换前后的数据框转储为镶木地板文件并进行比较。根据我的经验,与 VectorAssembler 构建的特征向量相比,手工构建的特征向量或其他特征提取方法可能会导致大小增加 10 倍,这是针对只有 10 个参数的逻辑回归。如果数据集包含尽可能多的列,情况会变得更糟。

一些建议:

  • 看看您是否可以通过其他方式构建特征向量。我不确定这在 Python 中的性能如何,但我在 Scala 中从这种方法中获得了很多成果。对于手动构建的向量或使用其他提取方法 (TF-IDF) 而不是 VectorAssembled 构建的向量,我注意到比较逻辑回归(10 个参数)的性能差异类似于 5x-6x。
  • 看看是否可以重塑数据以减少需要 VectorAssembler 处理的列数。
  • 看看增加 Spark 可用的 RAM 是否有帮助。

【讨论】:

  • 看起来不像是一个答案,但赏金时间到了。
  • @fny,我希望看到更好的解释。
  • 我已经发布了一个尝试澄清。
猜你喜欢
  • 1970-01-01
  • 2021-07-07
  • 2021-07-25
  • 2023-04-02
  • 1970-01-01
  • 2020-04-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多