【问题标题】:PySpark RDD Sparse Matrix multiplication from scala to pythonPySpark RDD稀疏矩阵乘法从scala到python
【发布时间】:2026-01-08 05:20:06
【问题描述】:

我之前发布了一个关于 900 万行和 85K 列的坐标矩阵乘法的问题。 Errors for block matrix multiplification in Spark

但是,我在 DataProc 上遇到了内存不足的问题。我尝试为集群配置高内存核心,但没有成功。

我正在阅读这篇文章,并认为它可能对我的情况有所帮助: https://www.balabit.com/blog/scalable-sparse-matrix-multiplication-in-apache-spark/ 但是,他们提供的解决方案是在我不熟悉的 Scala 中提供的。有人可以将此代码翻译成python吗?非常感谢!

def coordinateMatrixMultiply(leftMatrix: CoordinateMatrix, rightMatrix: CoordinateMatrix): 
    CoordinateMatrix = {
    val M_ = leftMatrix.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
    val N_ = rightMatrix.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

    val productEntries = M_
    .join(N_)
    .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
    .reduceByKey(_ + _)
    .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

    new CoordinateMatrix(productEntries)
}

【问题讨论】:

    标签: python scala pyspark matrix-multiplication google-cloud-dataproc


    【解决方案1】:

    Python 3.x 的实现

    1. 由于在 Python 3 中 lambda 函数中没有元组解包,我们必须通过单个变量 e 引用 MatrixEntry
    2. 此外,MatrixEntry 不可索引,因此我们必须调用各个属性 ijvalue
    def coordinateMatrixMultiply(leftmatrix, rightmatrix):
        left  =  leftmatrix.entries.map(lambda e: (e.j, (e.i, e.value)))
        right = rightmatrix.entries.map(lambda e: (e.i, (e.j, e.value)))
        productEntries = left \
            .join(right) \
            .map(lambda e: ((e[1][0][0], e[1][1][0]), (e[1][0][1]*e[1][1][1]))) \
            .reduceByKey(lambda x,y: x+y) \
            .map(lambda e: (*e[0], e[1]))
        return productEntries
    

    【讨论】:

      【解决方案2】:

      也快,只需要在插入前转换成rdd即可。

       def coordinateMatrixMultiply(leftmatrix, rightmatrix):
              left = leftmatrix.map(lambda (i, j, v): (j, (i, v)))
              right = rightmatrix.map(lambda (j, k, w): (j, (k, w)))
              productEntries = left   \
                              .join(right)    \
                              .map(lambda (x, ((i, v), (k, w))): ((i, k), (v * w)))   \
                              .reduceByKey(lambda x,y: x+y)   \
                              .map(lambda ((i, k), sum): (i, k, sum))
              return productEntries
      

      【讨论】:

      • 这在开箱即用的 PySpark 2.4.4./Python 3.7 中不起作用。