【发布时间】:2015-09-15 00:53:57
【问题描述】:
我一直在通过 python 类实现一个带有 spark 的模型。在类中定义的 RDD 上调用类方法时我有些头疼(有关详细信息,请参阅this question),但最终取得了一些进展。这是我正在使用的类方法的示例:
@staticmethod
def alpha_sampler(model):
# all the variables in this block are numpy arrays or floats
var_alpha = model.params.var_alpha
var_rating = model.params.var_rating
b = model.params.b
beta = model.params.beta
S = model.params.S
Z = model.params.Z
x_user_g0_inner_over_var = model.x_user_g0_inner_over_var
def _alpha_sampler(row):
feature_arr = row[2]
var_alpha_given_rest = 1/((1/var_alpha) + feature_arr.shape[0]*(1/var_rating))
i = row[0]
items = row[1]
O = row[3] - np.inner(feature_arr,b) - beta[items] - np.inner(S[i],Z[items])
E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var[i] + O.sum()/var_rating)
return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
return _alpha_sampler
如您所见,为了避免序列化错误,我定义了一个静态方法,该方法返回一个函数,该函数依次应用于 RDD 的每一行(model 是这里的父类,它是从另一个内部调用的model)的方法:
# self.grp_user is the RDD
self.params.alpha = np.array(self.grp_user.map(model.alpha_sampler(self)).collect())
现在,这一切正常,但根本没有利用 Spark 的广播变量。理想情况下,我在此函数中传递的所有变量(var_alpha、beta、S 等)都可以首先广播给工作人员,这样我就不会将它们作为map 的一部分重复传递。但我不知道该怎么做。
然后,我的问题如下:我应该如何/在哪里将它们变成广播变量,以便它们可用于我映射到 grp_user 的 alpha_sampler 函数?我相信会起作用的一件事是让它们成为全球性的,例如
global var_alpha
var_alpha = sc.broadcast(model.params.var_alpha)
# and similarly for the other variables...
那么 alpha_sampler 可以大大简化:
@staticmethod
def _alpha_sampler(row):
feature_arr = row[2]
var_alpha_given_rest = 1/((1/var_alpha.value) + feature_arr.shape[0]*(1/var_rating.value))
i = row[0]
items = row[1]
O = row[3] - np.inner(feature_arr,b.value) - beta.value[items] - np.inner(S.value[i],Z.value[items])
E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var.value[i] + O.sum()/var_rating.value)
return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
当然,我想避免这种使用全局变量非常危险的方法。有没有更好的方法可以让我利用广播变量?
【问题讨论】:
标签: python apache-spark