【问题标题】:RDD creation and variable bindingRDD创建和变量绑定
【发布时间】:2016-12-21 02:07:53
【问题描述】:

我有一个非常简单的代码:

def fun(x, n):
    return (x, n)

rdds = []
for i in range(2):
    rdd = sc.parallelize(range(5*i, 5*(i+1)))
    rdd = rdd.map(lambda x: fun(x, i))
    rdds.append(rdd)

a = sc.union(rdds)
print a.collect()

我预计输出如下:

[(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

但是,输出如下:

[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

至少可以这么说,这令人困惑。

看来,由于对 RDD 的惰性评估,用于创建 RDD 的 i 的值是它在调用 collect() 时所承载的值,即 1(来自 @987654326 的最后一次运行@循环)。

现在,元组的两个元素都派生自i

但是对于元组的第一个元素,i 的值似乎是 0 和 1,而对于元组的第二个元素 i 的值是 2。

有人能解释一下发生了什么吗?

谢谢。

【问题讨论】:

    标签: python apache-spark pyspark lazy-evaluation


    【解决方案1】:

    只是改变

    rdd = rdd.map(lambda x: fun(x, i))
    

    rdd = rdd.map(lambda x, i=i: (x, i))
    

    这只是关于Python的,看看这个

    https://docs.python.org/2.7/tutorial/controlflow.html#default-argument-values

    【讨论】:

    • 所以i=ii推入lambda函数的作用域,当它被调用时,会首先访问lambda函数的本地值。
    • @MohammadYusufGhazi 是的。默认值在定义范围内的函数定义点进行评估
    • range(2) 创建的list 中的01 整数对象的id 被分配给lambda 函数参数。该列表不会被垃圾回收,因为其中的项目仍被其他一些变量指向?
    【解决方案2】:

    sc.parallelize() 是一个会立即执行的动作。所以i 的值,即01 都将被使用。

    但在rdd.map() 的情况下,当您稍后调用collect() 时,只会使用i 的最后一个值。

    rdd = sc.parallelize(range(5*i, 5*(i+1)))
    rdd = rdd.map(lambda x: fun(x, i))
    

    这里 rdd.map 不会对 rdd 进行转换,它只会创建 DAG(Directed Acyclic Graph),即 lambda 函数不会应用于 rdd 的元素。

    当您调用 collect() 时,将调用 lambda 函数,但此时 i 的值为 1。如果您在调用 collect 之前重新分配 i=10,则将使用该值 i

    【讨论】:

    • 如果是这种情况,那么为什么值 0 和 1 用于元组的第一个元素,而在我的示例中,第二个元素只使用 1?谢谢。
    • @abhinavkulkarni 因为 sc.parallelize() 是一个将立即执行的动作,而 rdd.map() 是一个转换。如果您在循环中收集 rdd 然后附加到列表中,那么您将获得所需的结果。
    • @abhinavkulkarni 你采取张通的解决方案,他将i 的值推入lambda函数的范围。因此,当调用 lambda 函数时,它将首先使用它的本地值 i,然后再爬升到 i 的值已更改的外部范围。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多