【发布时间】:2019-08-02 14:23:43
【问题描述】:
以下简单的 spark 程序需要 4 分钟才能运行。我不知道这段代码有什么问题。
首先,我生成了一个非常小的 rdd
D = spark.sparkContext.parallelize([(0,[1,2,3]),(1,[2,3]),(2,[0,3]),(3,[1])]).cache()
然后我生成一个向量
P1 = spark.sparkContext.parallelize(list(zip(list(range(4)),[1/4]*4))).cache()
然后我定义一个函数来执行map 步骤
def MyFun(x):
L0 = len(x[2])
L = []
for i in x[2]:
L.append((i,x[1]/L0))
return L
然后我执行下面的代码
P0 = P1
D0 = D.join(P1).map(lambda x: [x[0],x[1][1],x[1][0]]).cache()
C0 = D0.flatMap(lambda x: MyFun(x)).cache()
P1 = C0.reduceByKey(lambda x,y:x+y).mapValues(lambda x:x*1.2+3.4).sortByKey().cache()
Diff = P1.join(P0).map(lambda x: abs(x[1][0]-x[1][1])).sum()
鉴于我的数据太小,我无法弄清楚这段代码运行如此缓慢的原因......
【问题讨论】:
-
为什么要缓存每一步?这要花很多钱
-
@BlueSheepToken 我认为这将有助于加快进程......我想如果我不缓存,那么它会从磁盘加载,这会很慢......我猜我错了......也许我应该只缓存()
D?因为我以后会加入?我也试过删除'cache()'的情况,但它仍然运行得很慢...... -
缓存正在写入磁盘而不是在内存中执行所有操作,您唯一可以缓存的是
P1(我担心 D 从磁盘加载而不是重新生成它需要更长的时间) .之后,我强烈建议你使用 dataframe api 而不是 rdd,你不要利用 spark 在这里为你做一些优化。您是否尝试过使用 Spark UI 进行分析? -
@BlueSheepToken 谢谢!我会试试你的建议
-
@fixx 我会写这个作为答案!可能更清楚
标签: python apache-spark pyspark mapreduce