【发布时间】:2021-12-03 13:46:12
【问题描述】:
我最近观察到covariance computation in Pandas 和MLLib equivalent 之间的结果存在显着差异。对于完全指定的输入(即没有任何 NA),结果相当接近,但对于缺失值则显着偏离。 Pandas source explains how NAs are treated 但我无法使用 Spark 重现结果。我在source 中找不到关于RowMatrix().computeCovariance() 对NA 的确切作用的文档 - 但我的Scala 充其量是非常公平的,我不熟悉BLAS,也许我错过了一些东西。由于我使用的是预构建的 macOS Spark 设置,因此我无法找到 BLAS 警告的原因:
WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
鉴于协方差对许多应用程序的重要性,我想知道是否有人可以阐明在 Apache Spark MLLib 中对协方差计算的缺失值的确切处理方式?
编辑:
此外,这在current Spark 3.2 release 中没有解决,因为The method `pd.DataFrame.cov()` is not implemented yet。
假设如下设置:
from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import RowMatrix
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18],
[21, 22, 23, 42, 26, 27, 28],
[31, 32, 33, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18],
[21, 22, None, 42, 26, None, 28],
[31, 32, None, 34, 36, None, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
从good_rows 计算的协方差对于 Pandas 和 Spark 是相等的:
good_rows.toDF().toPandas().cov()
# Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_3 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_4 332.0 332.0 332.0 368.0 332.0 332.0 332.0
_5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_7 350.0 350.0 350.0 332.0 350.0 350.0 350.0
spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 350.0 332.0 350.0 350.0 350.0
1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
3 332.0 332.0 332.0 368.0 332.0 332.0 332.0
4 350.0 350.0 350.0 332.0 350.0 350.0 350.0
5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
在非常不同的协方差矩阵中使用 bad_rowsresults 运行相同,除非 Pandas 是 cov() 运行 min_periods=(bad_rows.count()/2)+1
bad_rows.toDF().toPandas().cov()
#Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_2 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_3 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_4 332.0 332.0 700.0 368.0 332.0 700.0 332.0
_5 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_6 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_7 350.0 350.0 700.0 332.0 350.0 700.0 350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 NaN 332.0 350.0 NaN 350.0
1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
2 NaN NaN NaN NaN NaN NaN NaN
3 332.0 332.0 NaN 368.0 332.0 NaN 332.0
4 350.0 350.0 NaN 332.0 350.0 NaN 350.0
5 NaN NaN NaN NaN NaN NaN NaN
6 350.0 350.0 NaN 332.0 350.0 NaN 350.0
bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_2 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_3 NaN NaN NaN NaN NaN NaN NaN
_4 332.0 332.0 NaN 368.0 332.0 NaN 332.0
_5 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_6 NaN NaN NaN NaN NaN NaN NaN
_7 350.0 350.0 NaN 332.0 350.0 NaN 350.0
我确实尝试将 None 设置为 0 和 mean,但无法使用这些标准插补重现 MLLib 协方差结果,请参见下文。
# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18],
[21, 22, 0, 42, 26, 0, 28],
[31, 32, 0, 34, 36, 0, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 379.0 332.0 350.0 391.0 350.0
1 350.0 350.0 379.0 332.0 350.0 391.0 350.0
2 379.0 379.0 606.7 319.6 379.0 646.3 379.0
3 332.0 332.0 319.6 368.0 332.0 324.4 332.0
4 350.0 350.0 379.0 332.0 350.0 391.0 350.0
5 391.0 391.0 646.3 324.4 391.0 690.7 391.0
6 350.0 350.0 379.0 332.0 350.0 391.0 350.0
# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18],
[21, 22, 27, 42, 26, 37, 28],
[31, 32, 27, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 298.0 332.0 350.0 280.0 350.0
1 350.0 350.0 298.0 332.0 350.0 280.0 350.0
2 298.0 298.0 290.8 287.2 298.0 280.0 298.0
3 332.0 332.0 287.2 368.0 332.0 280.0 332.0
4 350.0 350.0 298.0 332.0 350.0 280.0 350.0
5 280.0 280.0 280.0 280.0 280.0 280.0 280.0
6 350.0 350.0 298.0 332.0 350.0 280.0 350.0
如果不是这样,这里发生了什么?如何让 Spark MLLib 产生与 Pandas 相当相似的结果?
【问题讨论】:
标签: python pandas apache-spark pyspark apache-spark-mllib