【问题标题】:Statistics.corr of very large RDD[Vector] in Spark Causes Generated Code Limits to Be Reached: How to Fix?Spark 中非常大的 RDD[Vector] 的 Statistics.corr 导致达到生成的代码限制:如何修复?
【发布时间】:2025-12-09 13:35:01
【问题描述】:

我有一个包含任意大量行的数据框,该数据框是通过执行类似于以下操作而创建的:

// pivot data to wide format
val wide = df.groupBy("id").pivot("ip").sum("msgs")

// drop columns and fill in null values
val dfmat = wide.drop("id").na.fill(0)
val dimnames = dfmat.columns

我不知道会有多少不同的"ip"。然后我尝试获取dfmat 的每一行并创建一个RDD[Vector] 对象以与org.apache.spark.mllib.Statistics.corr 一起使用。为此,我正在执行以下操作并遇到错误:

// try a different mapping
val mat = dfmat.rdd.map(row => Vectors.parse(row.mkString("[",",","]")))

// create correlation matrix
val correlMatrix: Matrix = Statistics.corr(mat, "pearson")

这适用于小型数据集(100 万或更少的记录),但在对完整数据集进行操作时会失败。我还得到了非常非常大的日志记录,其中包含奇怪的记录,例如:

/* 125222 */     this.value_8326 = -1L;
/* 125223 */     this.isNull_8327 = true;
/* 125224 */     this.value_8327 = -1L;
/* 125225 */     this.isNull_8328 = true;
/* 125226 */     this.value_8328 = -1L;
/* 125227 */     this.isNull_8329 = true;
/* 125228 */     this.value_8329 = -1L;
/* 125229 */     this.isNull_8330 = true;
/* 125230 */     this.value_8330 = -1L;
/* 125231 */     this.isNull_8331 = true;
/* 125232 */     this.value_8331 = -1L;
/* 125233 */     this.isNull_8332 = true;
/* 125234 */     this.value_8332 = -1L;
/* 125235 */     this.isNull_8333 = true;
/* 125236 */     this.value_8333 = -1L;
/* 125237 */   }
/* 125238 */
/* 125239 */   public org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
/* 125240 */     mutableRow = row;
/* 125241 */     return this;
/* 125242 */   }
/* 125243 */
/* 125244 */   /* Provide immutable access to the last projected row. */
/* 125245 */   public InternalRow currentValue() {
/* 125246 */     return (InternalRow) mutableRow;
/* 125247 */   }
/* 125248 */
/* 125249 */   public java.lang.Object apply(java.lang.Object _i) {
/* 125250 */     InternalRow i = (InternalRow) _i;
/* 125251 */     apply16668_0(i);
/* 125252 */     apply16668_1(i);
/* 125253 */     apply16668_2(i);
/* 125254 */     apply16668_3(i);
/* 125255 */     apply16668_4(i);
/* 125256 */     apply16668_5(i);
/* 125257 */     apply16668_6(i);
/* 125258 */     apply16668_7(i); 
/* 125259 */     apply16668_8(i);
/* 125260 */     apply16668_9(i);
/* 125261 */     apply16668_10(i);
/* 125262 */     apply16668_11(i);
/* 125263 */     apply16668_12(i);
/* 125264 */     apply16668_13(i);
/* 125265 */     apply16668_14(i);
/* 125266 */     apply16668_15(i);
/* 125267 */     apply16668_16(i);
/* 125268 */     apply16668_17(i);
/* 125269 */     apply16668_18(i);
/* 125270 */     // copy all the results into MutableRow
/* 125271 */     apply16669_0(i); 
/* 125272 */     apply16669_1(i);
/* 125273 */     apply16669_2(i);
/* 125274 */     apply16669_3(i);
/* 125275 */     apply16669_4(i);
/* 125276 */     apply16669_5(i);
/* 125277 */     apply16669_6(i);
/* 125278 */     apply16669_7(i);
/* 125279 */     apply16669_8(i);
/* 125280 */     apply16669_9(i);
/* 125281 */     apply16669_10(i);
/* 125282 */     apply16669_11(i);
/* 125283 */     apply16669_12(i);
/* 125284 */     apply16669_13(i);
/* 125285 */     apply16669_14(i);
/* 125286 */     apply16669_15(i);
/* 125287 */     apply16669_16(i);
/* 125288 */     apply16669_17(i);
/* 125289 */     apply16669_18(i);
/* 125290 */     apply16669_19(i);
/* 125291 */     apply16669_20(i);
/* 125292 */     apply16669_21(i);
/* 125293 */     apply16669_22(i);
/* 125294 */     apply16669_23(i);
/* 125295 */     return mutableRow;
/* 125296 */   }
/* 125297 */ }
/* 125298 */

最后:

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
    at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 31 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Lorg/apache/spark/sql/catalyst/expressions/Expression;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
    at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
    at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
    at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
    at org.codehaus.janino.UnitCompiler.writeConstantFieldrefInfo(UnitCompiler.java:10279)
    at org.codehaus.janino.UnitCompiler.putfield(UnitCompiler.java:9956)
    at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5086)
    at org.codehaus.janino.UnitCompiler.access$11800(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$15.visitFieldAccess(UnitCompiler.java:5062)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:3235)
    at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
    at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5095)
    at org.codehaus.janino.UnitCompiler.access$11900(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$15.visitFieldAccessExpression(UnitCompiler.java:5063)
    at org.codehaus.janino.Java$FieldAccessExpression.accept(Java.java:3563)
    at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2675)
    at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
    at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
    at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:518)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
    at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
    at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
    at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
    ... 35 more

这看起来像是由自动代码生成引起的错误。我不太确定发生了什么。任何关于如何调试或如何以不同方式做这样的事情的想法都值得赞赏。如果没有其他合适的解决方案来做同样的事情,那么我如何将自动生成的代码的大小减小到小于约束?我可以更改约束吗?

谢谢,

【问题讨论】:

  • 我突然想到这个问题可能与标题无关 - 这个错误是由数据帧转换为 RDD[Vector] (val mat = ...) 还是在计算Statistics.corr(mat)

标签: scala apache-spark


【解决方案1】:

为什么要pivot?这是一项昂贵且效率极低的操作。只需根据您已有的数据创建一个矩阵。

Firs 让我们汇总您的数据:

val cols = Seq("id", "ip")

val aggregated = df.groupBy(cols.map(col(_)): _*).agg(sum($"msgs").alias("msgs"))

索引所需的列:

import org.apache.spark.ml.feature.StringIndexer

val cols = Seq("id", "ip")

val indexers = cols.map(c => 
  new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx").fit(aggregated)
)

val indexed = indexers.foldLeft(aggregated)((d, t) => t.transform(d)).select(
  cols.map(c => col(s"${c}_idx").cast("long")) :+ $"msgs".cast("double"): _*
)

创建一个矩阵:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 
import org.apache.spark.sql.Row

val rows = new CoordinateMatrix(
  indexed.map{case Row(i: Long, j: Long, v: Double) => MatrixEntry(i, j, v)}
).toRowMatrix.rows

Statistics.corr(rows, "pearson")

【讨论】:

  • 谢谢。这绝对解决了我上面遇到的错误,并且通常是构造矩阵的更好方法。非常感谢您的帮助。
最近更新 更多