【问题标题】:Large matrix operations: Multiplication in Scala/Apache Spark大型矩阵运算:Scala/Apache Spark 中的乘法
【发布时间】:2015-04-07 20:39:20
【问题描述】:

我需要将两个大矩阵相乘,XY。通常X 有~500K 行和~18K 列,Y 有~18K 行和~18K 列。矩阵X 预计是稀疏的,矩阵Y 预计是稀疏/密集的。在 Scala/Apache Spark 中执行这种乘法的理想方法是什么?

【问题讨论】:

  • 这是一道数学题。您可以在math.stackexchange.com 询问
  • 稀疏到什么程度?
  • 一道数学题?呃……显然不是,这是关于 Scala 的。

标签: scala apache-spark sparse-matrix matrix-multiplication large-data


【解决方案1】:

我有一些代码给你。它将矩阵表示为列向量数组(这意味着数组中的每个条目都是一列,而不是一行)。两个 1000*1000 的矩阵相乘大约需要 0.7s。两个 10,000 * 10,000 矩阵需要 11 分钟。 20,000 * 20,000 为 1.5 小时,(500k*18k) 次 (18k*18k) 为 30 小时。但是如果你并行运行它(通过使用被注释掉的代码)它应该运行大约 2 到 3 倍(在 4 核 cpu 上)。但请记住,第一个矩阵中的列数始终必须与第二个矩阵中的行数相同。

class Matrix(val columnVectors: Array[Array[Double]]) {
  val columns = columnVectors.size
  val rows = columnVectors.head.size
  def *(v: Array[Double]): Array[Double] = {
    val newValues = Array.ofDim[Double](rows)
    var col = 0
    while(col < columns) {
      val n = v(col)
      val column = columnVectors(col)
      var row = 0
      while(row < newValues.size) {
        newValues(row) += column(row) * n
        row += 1
      }
      col += 1
    }
    newValues
  }
  def *(other: Matrix): Matrix = {
    //do the calculation on only one cpu
    new Matrix(other.columnVectors.map(col => this * col))

    //do the calculation in parallel on all available cpus
    //new Matrix(other.columnVectors.par.map(col => this * col).toArray)
  }
  override def toString = {
    columnVectors.transpose.map(_.mkString(", ")).mkString("\n")
  }
}

编辑:

好的,这是一个更好的版本。我现在将行向量而不是列向量存储在矩阵中。这使得在第一个矩阵稀疏的情况下优化乘法变得更容易。 我还使用迭代器添加了一个惰性版本的矩阵乘法。由于第一个矩阵是 500k * 18k = 90 亿个数字,所以这样一个惰性版本将允许您在不需要太多内存的情况下进行乘法运算。您只需要创建一个可以懒惰地读取行的迭代器,例如从数据库中提取数据,然后将生成的迭代器中的行写回。

import scala.collection.Iterator
import scala.util.{Random => rand}

def time[T](descr: String)(f: => T): T = {
  val start = System.nanoTime
  val r = f
  val end = System.nanoTime
  val time = (end - start)/1e6
  println(descr + ": time = " + time + "ms")
  r
}

object Matrix {
  def mulLazy(m1: Iterator[Array[Double]], m2: Matrix): Iterator[Array[Double]] = {
    m1.grouped(8).map { group =>
      group.par.map(m2.mulRow).toIterator
    }.flatten
  }
}

class Matrix(val rowVectors: Array[Array[Double]]) {
  val columns = rowVectors.head.size
  val rows = rowVectors.size

  private def mulRow(otherRow: Array[Double]): Array[Double] = {
    val rowVectors = this.rowVectors
    val result = Array.ofDim[Double](columns)
    var i = 0
    while(i < otherRow.size) {
      val value = otherRow(i)
      if(value != 0) { //optimization for sparse matrix
        val row = rowVectors(i)
        var col = 0
        while(col < result.size) {
          result(col) += value * row(col)
          col += 1
        }
      }
      i += 1
    }
    result
  }

  def *(other: Matrix): Matrix = {
    new Matrix(rowVectors.par.map(other.mulRow).toArray)
  }

  def equals(other: Matrix): Boolean = {
    java.util.Arrays.deepEquals(this.rowVectors.asInstanceOf[Array[Object]], other.rowVectors.asInstanceOf[Array[Object]])
  }

  override def equals(other: Any): Boolean = {
    if(other.isInstanceOf[Matrix]) equals(other.asInstanceOf[Matrix]) else false
  }

  override def toString = {
    rowVectors.map(_.mkString(", ")).mkString("\n")
  }
}

def randMatrix(rows: Int, columns: Int): Matrix = {  
  new Matrix((1 to rows).map(_ => Array.fill(columns)(rand.nextDouble * 100)).toArray)
}

def sparseRandMatrix(rows: Int, columns: Int, ratio: Double): Matrix = {
  new Matrix((1 to rows).map(_ => Array.fill(columns)(if(rand.nextDouble > ratio) 0 else rand.nextDouble * 100)).toArray)
}

val N = 2000

val m1 = sparseRandMatrix(N, N, 0.1) // only 10% of the numbers will be different from 0
val m2 = randMatrix(N, N)

val m3 = m1.rowVectors.toIterator

val m12 = time("m1 * m2")(m1 * m2)
val m32 = time("m3 * m2")(Matrix.mulLazy(m3, m2)) //doesn't take much time because the matrix multiplication is lazy

println(m32)

println("m12 == m32 = " + (new Matrix(m32.toArray) == m12))

【讨论】:

  • 考虑优化稀疏性。
  • 好的,我优化了代码。对于这两种情况,稀疏矩阵和巨型矩阵都太大而无法放入 ram。
  • @SpiderPig 辛苦了。你是 SO 社区的瑰宝。保持。但请考虑到您的时间也很宝贵。在回答此类问题之前,请确保 OP 已经付出了足够的努力,而不是试图不费吹灰之力就完成作业。
  • 如果将N增加到20,000,如何解决java.lang.OutOfMemoryError错误。我正在使用 -Xmx4g 作为编译器选项,但我仍然得到它。
  • 20,000 * 20,000 * 4 字节/int = 1.6 GB。您创建 3 个矩阵,每个矩阵的大小为 1.6 GB。您可以给 Java 更多内存,例如-Xmx8g 或者您无法通过使用 mulLazy 方法并将第一个矩阵表示为数组的迭代器而不是矩阵对象来将所有矩阵保留在 RAM 中。
猜你喜欢
  • 2013-08-29
  • 1970-01-01
  • 2016-04-18
  • 2018-01-27
  • 2011-05-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多