【问题标题】:Spark matrix multiplication code takes a lot of time to executeSpark矩阵乘法代码需要大量时间来执行
【发布时间】:2018-07-30 16:44:17
【问题描述】:

我在 Spyder 上使用 findspark.init() 设置了一个简单的 PySpark 环境,并且我在 localhost 上运行代码。我对在 Spark 中使用 BlockMatrix 进行简单的矩阵乘法如何花费数小时和数小时的时间感到困惑,而在 numpy 上运行相同的代码需要几分钟。

这是我正在使用的代码:

import numpy as np
import pandas as pd
from sklearn import cross_validation as cv
import itertools
import random
import findspark
import time
start=time.time()

findspark.init()

from pyspark.mllib.linalg.distributed import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('myapp')

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

from pyspark.mllib.linalg.distributed import *

def as_block_matrix(rdd, rowsPerBlock=1024, colsPerBlock=1024):
    return IndexedRowMatrix(
        rdd.zipWithIndex().map(lambda xi: IndexedRow(xi[1], xi[0]))
    ).toBlockMatrix(rowsPerBlock, colsPerBlock)

def prediction(P,Q):
#    np.r_[ pp,np.zeros(len(pp)) ].reshape(2,20)
    Pn=np.r_[ P,np.zeros(len(P)),np.zeros(len(P)),np.zeros(len(P)),np.zeros(len(P)) ].reshape(5,len(P))
    Qn=np.r_[ Q,np.zeros(len(Q)),np.zeros(len(Q)),np.zeros(len(Q)),np.zeros(len(Q)) ].reshape(5,len(Q))
    A = Pn[:1]
    B = Qn[:1].T
    distP = sc.parallelize(A)
    distQ = sc.parallelize(B)
    mat=as_block_matrix(distP).multiply(as_block_matrix(distQ))
    blocksRDD = mat.blocks
    m=(list(blocksRDD.collect())[0][1])
    #print(m)
    return m.toArray()[0,0]

for epoch in range(1):
    for u, i in zip(users,items):
        e = R[u, i] - prediction(P[:,u],Q[:,i]) 

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    不知道矩阵的大小会使回答这个问题变得更加困难,但是如果您使用的是高维稀疏矩阵,那么 pyspark 进行矩阵乘法的方式可能会出现一个问题。为了将稀疏矩阵相乘,pyspark 将稀疏矩阵转换为密集矩阵。这在文档中有所说明:

    http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix.multiply

    其中指出:

    multiply(other) Left 将此 BlockMatrix 与另一个 BlockMatrix 相乘。这个矩阵的 colsPerBlock 必须等于其他的 rowsPerBlock。如果 other 包含任何 SparseMatrix 块,则必须将它们转换为 DenseMatrix 块。输出 BlockMatrix 将仅包含 DenseMatrix 块。在添加对两个稀疏矩阵相乘的支持之前,这可能会导致一些性能问题。

    据我所知,如果您打算使用内置矩阵数据类型,则没有很好的解决方法。一种解决方法是放弃矩阵数据类型并使用 rdd 或数据框连接操作手动滚动您自己的矩阵乘法。例如,如果您可以使用数据框,则以下内容已经过测试并且在规模上运行良好:

    from pyspark.sql.functions import sum
    
    def multiply_df_matrices(A,B):
        return A.join(B,A['column']==B['row'])\
                .groupBy(A['row'],B['column'])\
                .agg(sum(A['value']*B['value']).alias('value'))
    

    你可以通过加入两个 rdds 来做类似的事情。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-03
      • 1970-01-01
      • 2018-05-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多