【发布时间】:2018-02-21 00:30:29
【问题描述】:
在我的应用程序代码库中集成 pyspark 时,我无法在 RDD 的 map 方法中引用类的方法。我用一个简单的例子重复了这个问题,如下
这是我定义的一个虚拟类,它只是将一个数字添加到从作为类属性的 RDD 派生的 RDD 的每个元素中:
class Test:
def __init__(self):
self.sc = SparkContext()
a = [('a', 1), ('b', 2), ('c', 3)]
self.a_r = self.sc.parallelize(a)
def add(self, a, b):
return a + b
def test_func(self, b):
c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
v = c_r.map(lambda l: self.add(l[1], b))
v_c = v.collect()
return v_c
test_func() 在 RDD v 上调用 map() 方法,而后者又在 v 的每个元素上调用 add() 方法。调用test_func() 会抛出以下错误:
pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
现在,当我将 add() 方法移出类时,例如:
def add(self, a, b):
return a + b
class Test:
def __init__(self):
self.sc = SparkContext()
a = [('a', 1), ('b', 2), ('c', 3)]
self.a_r = self.sc.parallelize(a)
def test_func(self, b):
c_r = self.a_r.map(lambda l: (l[0], l[1] * 2))
v = c_r.map(lambda l: add(l[1], b))
v_c = v.collect()
return v_c
现在调用test_func() 可以正常工作。
[7, 9, 11]
为什么会这样?如何将类方法传递给 RDD 的 map() 方法?
【问题讨论】: