【问题标题】:spark pass data from one map function to anotherspark将数据从一个映射函数传递到另一个
【发布时间】:2017-10-28 01:40:35
【问题描述】:

我的 Spark 工作流程中有两个步骤

第一个函数接受一个 RDD 并吐出数字的平方。

输入:- [1,2,3,4,5]

第一步输出:- [1,4,9,16,25]

rdd = spark.sparkContext.parallelize([1,2,3,4,5],2) 
rdd = rdd.map(square_func)  # call the image_chunk_func

def square_func(x): 返回 x*x

我的第二步基本上应该取第一步的输出,找到立方根。

如何收集第 1 步的输出并将其传递给第 2 步。

我应该 rdd.collect 并吐到第 2 步吗?

Python 函数做多个地图功能。

>>> rdd = sc.parallelize([1,2,3,4,5])
>>> result = rdd.map(sqr).map(cubex)
>>> rdd.collect()
[1, 2, 3, 4, 5]

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您可以向现有计算图添加另一个转换:

    scala> val rdd = sc.parallelize(List(1.0, 2.0, 3.0))
    
    scala> val cbrtRdd = rdd.map(square).map(math.cbrt)
    

    到目前为止,没有任何操作发生。如果您执行 action,您只需声明您想要完成的操作。

    collectsave 是动作示例。它们执行指定的操作并将结果收集到驱动程序内存中或将它们分别写入磁盘。

    scala> cbrtRdd.collect
    res8: Array[Double] = Array(1.0, 1.5874010519681996, 2.080083823051904)
    

    您可能会发现了解 Spark 中的转换和操作很有用。

    (我使用 scala 进行这些操作,但概念成立)

    【讨论】:

    • 我接受你的回答,但是一个普遍的问题....我可以像这样分离函数吗:- rdd.map(square) --> 收集输入 -->rdd.parallelize([第一个函数的输出],4)--> rdd.map(math.cbrt)...ie;多个并行化..
    • @user1050619 是的,你可以。一种更可扩展的方法是:输入->平方->将平方输出写入磁盘->从磁盘读取平方输出->立方根->将平方输出写入磁盘。我假设您的数据集足够大,可以担心管道是否会扩展(如果不是,最好不要使用 Spark)。
    猜你喜欢
    • 2018-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-29
    • 1970-01-01
    • 2022-01-13
    • 1970-01-01
    相关资源
    最近更新 更多