【问题标题】:Proper handling of spark broadcast variables in a Python class在 Python 类中正确处理 spark 广播变量
【发布时间】:2015-09-15 00:53:57
【问题描述】:

我一直在通过 python 类实现一个带有 spark 的模型。在类中定义的 RDD 上调用类方法时我有些头疼(有关详细信息,请参阅this question),但最终取得了一些进展。这是我正在使用的类方法的示例:

@staticmethod
def alpha_sampler(model):

    # all the variables in this block are numpy arrays or floats
    var_alpha = model.params.var_alpha
    var_rating = model.params.var_rating
    b = model.params.b
    beta = model.params.beta
    S = model.params.S
    Z = model.params.Z
    x_user_g0_inner_over_var = model.x_user_g0_inner_over_var

    def _alpha_sampler(row):
        feature_arr = row[2]
        var_alpha_given_rest = 1/((1/var_alpha) + feature_arr.shape[0]*(1/var_rating))
        i = row[0]
        items = row[1]
        O = row[3] - np.inner(feature_arr,b) - beta[items] - np.inner(S[i],Z[items])
        E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var[i] + O.sum()/var_rating)
        return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
    return _alpha_sampler

如您所见,为了避免序列化错误,我定义了一个静态方法,该方法返回一个函数,该函数依次应用于 RDD 的每一行(model 是这里的父类,它是从另一个内部调用的model)的方法:

# self.grp_user is the RDD
self.params.alpha = np.array(self.grp_user.map(model.alpha_sampler(self)).collect())

现在,这一切正常,但根本没有利用 Spark 的广播变量。理想情况下,我在此函数中传递的所有变量(var_alpha、beta、S 等)都可以首先广播给工作人员,这样我就不会将它们作为map 的一部分重复传递。但我不知道该怎么做。

然后,我的问题如下:我应该如何/在哪里将它们变成广播变量,以便它们可用于我映射到 grp_useralpha_sampler 函数?我相信会起作用的一件事是让它们成为全球性的,例如

global var_alpha
var_alpha = sc.broadcast(model.params.var_alpha)
# and similarly for the other variables...

那么 alpha_sampler 可以大大简化:

@staticmethod
def _alpha_sampler(row):
    feature_arr = row[2]
    var_alpha_given_rest = 1/((1/var_alpha.value) + feature_arr.shape[0]*(1/var_rating.value))
    i = row[0]
    items = row[1]
    O = row[3] - np.inner(feature_arr,b.value) - beta.value[items] - np.inner(S.value[i],Z.value[items])
    E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var.value[i] + O.sum()/var_rating.value)
    return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))

当然,我想避免这种使用全局变量非常危险的方法。有没有更好的方法可以让我利用广播变量?

【问题讨论】:

    标签: python apache-spark


    【解决方案1】:

    假设您在此处使用的变量只是标量,从性能角度来看,这里可能没有任何好处,并且使用广播变量会使您的代码可读性降低,但您可以将广播变量作为参数传递给静态方法:

    class model(object):
        @staticmethod
        def foobar(a_model, mu):
            y = a_model.y
            def _foobar(x):
                return x - mu.value + y 
            return _foobar
    
        def __init__(self, sc):
            self.sc = sc
            self.y = -1
            self.rdd = self.sc.parallelize([1, 2, 3])
    
        def get_mean(self):
            return self.rdd.mean()
    
        def run_foobar(self):
            mu = self.sc.broadcast(self.get_mean())
            self.data = self.rdd.map(model.foobar(self, mu))
    

    或在那里初始化它:

    class model(object):
        @staticmethod
        def foobar(a_model):
            mu = a_model.sc.broadcast(a_model.get_mean())
            y = a_model.y
            def _foobar(x):
                return x - mu.value + y 
            return _foobar
    
        def __init__(self, sc):
            self.sc = sc
            self.y = -1
            self.rdd = self.sc.parallelize([1, 2, 3])
    
        def get_mean(self):
            return self.rdd.mean()
    
        def run_foobar(self):
            self.data = self.rdd.map(model.foobar(self))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多