【问题标题】:What is the difference between map and flatMap and a good use case for each?map 和 flatMap 有什么区别,每个都有一个好的用例?
【发布时间】:2014-04-16 12:58:10
【问题描述】:

有人可以向我解释一下 map 和 flatMap 之间的区别以及它们各自的好用例吗?

“扁平化结果”是什么意思? 有什么用?

【问题讨论】:

  • 由于您添加了 Spark 标签,我假设您在Apache Spark 中询问RDD.mapRDD.flatMap。一般来说,Spark 的 RDD 操作是根据其对应的 Scala 集合操作建模的。 stackoverflow.com/q/1059776/590203 中的答案讨论了 Scala 中 mapflatMap 之间的区别,可能对您有所帮助。
  • 这里的大多数示例似乎都假设 flatMap 仅对集合进行操作,但事实并非如此。

标签: apache-spark


【解决方案1】:

这是一个区别示例,作为spark-shell 会话:

首先,一些数据——两行文字:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

    res0: Array[String] = Array("Roses are red", "Violets are blue")

现在,map 将一个长度为 N 的 RDD 转换为另一个长度为 N 的 RDD。

例如,它从两行映射到两个行长:

rdd.map(_.length).collect

    res1: Array[Int] = Array(13, 16)

但是flatMap(松散地说)将长度为 N 的 RDD 转换为 N 个集合的集合,然后将它们展平为单个 RDD 的结果。

rdd.flatMap(_.split(" ")).collect

    res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

我们每行有多个单词,多行,但我们最终得到一个单词输出数组

为了说明这一点,从行集合到单词集合的 flatMapping 如下所示:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

因此,flatMap 的输入和输出 RDD 通常具有不同的大小。

如果我们尝试将mapsplit 函数一起使用,我们最终会得到嵌套结构(单词数组的RDD,类型为RDD[Array[String]]),因为我们必须只有一个结果每个输入:

rdd.map(_.split(" ")).collect

    res3: Array[Array[String]] = Array(
                                     Array(Roses, are, red), 
                                     Array(Violets, are, blue)
                                 )

最后,一个有用的特殊情况是与一个可能不返回答案的函数进行映射,因此返回一个Option。我们可以使用flatMap过滤掉返回None的元素,并从返回Some的元素中提取值:

val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

    res3: Array[Int] = Array(10,20)

(请注意,Option 的行为更像是一个包含一个元素或零个元素的列表)

【讨论】:

  • 在 map 内调用 split 会给["a b c", "", "d"] =&gt; [["a","b","c"],[],["d"]]吗?
  • 是的 - (但请注意,我的非正式符号只是为了表示某种集合 - 实际上将 split 映射到字符串列表将产生一个数组列表)
  • 谢谢你写出来,这是我读过的最好的解释来区分相同的区别
【解决方案2】:

通常我们在 hadoop 中使用字数统计示例。我将采用相同的用例并使用mapflatMap,我们将看到它处理数据的方式的不同。

下面是示例数据文件。

hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome

上面的文件将使用mapflatMap进行解析。

使用map

>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']

输入有 4 行,输出大小也是 4,即 N 个元素 ==> N 个元素。

使用flatMap

>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']

输出与地图不同。


让我们为每个键分配 1 作为值来获取字数。

  • fm:使用flatMap创建的RDD
  • wc:使用map 创建的RDD
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]

而 RDD wc 上的 flatMap 将给出以下不需要的输出:

>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]

如果使用map 而不是flatMap,则无法获得字数。

根据定义,mapflatMap 的区别是:

map: 它通过对每个元素应用给定的函数来返回一个新的 RDD 的RDD。 map 中的函数只返回一项。

flatMap:类似于map,通过应用一个函数返回一个新的RDD 到 RDD 的每个元素,但输出是扁平的。

【讨论】:

  • 我觉得这个答案比公认的答案更好。
  • 当您可以复制粘贴输出文本时,您究竟为什么要创建难以辨认的屏幕截图?
  • 所以 flatMap() 是 map() + "flatten",我知道它没有多大意义,但是我们可以在 map() 之后使用任何类型的“flatten”函数吗?
  • 您的代码有一个误导性的错字。 .map(lambda line:line.split(" ")) 的结果不是字符串数组。您应该将data.collect() 更改为wc.collect,您将看到一个数组数组。
  • 是的,但是命令的结果仍然是错误的。你跑wc.collect()了吗?
【解决方案3】:

归结为您最初的问题:扁平化是什么意思

当你使用 flatMap 时,一个“多维”集合变成了“一维”集合。

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")  
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

当你想使用 flatMap 时,

  • 您的地图功能会导致创建多层结构
  • 但您想要的只是一个简单的 - 平面 - 一维结构,通过删除所有内部分组

【讨论】:

    【解决方案4】:

    所有的例子都很好....这是很好的视觉插图...来源礼貌:DataFlair training of spark

    Map:map 是 Apache Spark 中的一种转换操作。它适用于 RDD 的每个元素,并将结果作为新的 RDD 返回。在 Map 中,运维开发者可以定义自己的自定义业务逻辑。相同的逻辑将应用于 RDD 的所有元素。

    Spark RDD map 函数将一个元素作为输入,根据自定义代码(由开发人员指定)处理它,一次返回一个元素。 Map 将一个长度为 N 的 RDD 转换为另一个长度为 N 的 RDD。输入和输出 RDD 通常具有相同数量的记录。

    map 使用 scala 的示例:

    val x = spark.sparkContext.parallelize(List("spark", "map", "example",  "sample", "example"), 3)
    val y = x.map(x => (x, 1))
    y.collect
    // res0: Array[(String, Int)] = 
    //    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
    
    // rdd y can be re writen with shorter syntax in scala as 
    val y = x.map((_, 1))
    y.collect
    // res1: Array[(String, Int)] = 
    //    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
    
    // Another example of making tuple with string and it's length
    val y = x.map(x => (x, x.length))
    y.collect
    // res3: Array[(String, Int)] = 
    //    Array((spark,5), (map,3), (example,7), (sample,6), (example,7))
    

    平面图:

    flatMap 是一个转换操作。它适用于 RDD 的每个元素,并将结果作为新的RDD 返回。它类似于 Map,但 FlatMap 允许从 map 函数返回 0、1 或更多元素。在 FlatMap 操作中,开发者可以定义自己的自定义业务逻辑。相同的逻辑将应用于 RDD 的所有元素。

    “扁平化结果”是什么意思?

    FlatMap 函数将一个元素作为输入,根据自定义代码(由开发人员指定)对其进行处理,并一次返回 0 个或多个元素。 flatMap() 将一个长度为 N 的 RDD 转换为另一个长度为 M 的 RDD。

    flatMap 使用 scala 的示例:

    val x = spark.sparkContext.parallelize(List("spark flatmap example",  "sample example"), 2)
    
    // map operation will return Array of Arrays in following case : check type of res0
    val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
    y.collect
    // res0: Array[Array[String]] = 
    //  Array(Array(spark, flatmap, example), Array(sample, example))
    
    // flatMap operation will return Array of words in following case : Check type of res1
    val y = x.flatMap(x => x.split(" "))
    y.collect
    //res1: Array[String] = 
    //  Array(spark, flatmap, example, sample, example)
    
    // RDD y can be re written with shorter syntax in scala as 
    val y = x.flatMap(_.split(" "))
    y.collect
    //res2: Array[String] = 
    //  Array(spark, flatmap, example, sample, example)
    

    【讨论】:

      【解决方案5】:

      如果你问的是 Spark 中 RDD.map 和 RDD.flatMap 的区别,map 将一个大小为 N 的 RDD 转换为另一个大小为 N 的 RDD。例如。

      myRDD.map(x => x*2)
      

      例如,如果 myRDD 由 Doubles 组成。

      虽然 flatMap 可以将 RDD 转换为另一种不同大小的 RDD: 例如:

      myRDD.flatMap(x =>new Seq(2*x,3*x))
      

      这将返回一个大小为 2*N 的 RDD 或

      myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
      

      【讨论】:

        【解决方案6】:

        test.md 为例:

        ➜  spark-1.6.1 cat test.md
        This is the first line;
        This is the second line;
        This is the last line.
        
        scala> val textFile = sc.textFile("test.md")
        scala> textFile.map(line => line.split(" ")).count()
        res2: Long = 3
        
        scala> textFile.flatMap(line => line.split(" ")).count()
        res3: Long = 15
        
        scala> textFile.map(line => line.split(" ")).collect()
        res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))
        
        scala> textFile.flatMap(line => line.split(" ")).collect()
        res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
        

        如果你使用map方法,你会得到test.md的行数,对于flatMap方法,你会得到字数。

        map 方法类似于flatMap,都是返回一个新的 RDD。 map 方法经常使用返回一个新的 RDD,flatMap 方法经常使用拆分词。

        【讨论】:

          【解决方案7】:

          map 和 flatMap 是相似的,它们从输入的 RDD 中取一条线并在其上应用一个函数。它们的不同之处在于 map 中的函数只返回一个元素,而 flatMap 中的函数可以返回一个元素列表(0 个或多个)作为迭代器。

          此外,flatMap 的输出也被展平。虽然 flatMap 中的函数返回一个元素列表,但 flatMap 返回一个 RDD,它以平面方式(不是列表)包含列表中的所有元素。

          【讨论】:

            【解决方案8】:

            map 返回相同数量元素的 RDD,而 flatMap 可能不会。

            flatMap 的示例用例过滤掉丢失或不正确的数据。

            map 的示例用例用于输入和输出元素数量相同的各种情况。

            number.csv

            1
            2
            3
            -
            4
            -
            5
            

            ma​​p.py 将 add.csv 中的所有数字相加。

            from operator import *
            
            def f(row):
              try:
                return float(row)
              except Exception:
                return 0
            
            rdd = sc.textFile('a.csv').map(f)
            
            print(rdd.count())      # 7
            print(rdd.reduce(add))  # 15.0
            

            flatMap.py 在添加之前使用flatMap 过滤掉缺失的数据。与以前的版本相比,添加的数字更少。

            from operator import *
            
            def f(row):
              try:
                return [float(row)]
              except Exception:
                return []
            
            rdd = sc.textFile('a.csv').flatMap(f)
            
            print(rdd.count())      # 5
            print(rdd.reduce(add))  # 15.0
            

            【讨论】:

              【解决方案9】:

              可以从下面的示例 pyspark 代码中看出区别:

              rdd = sc.parallelize([2, 3, 4])
              rdd.flatMap(lambda x: range(1, x)).collect()
              Output:
              [1, 1, 2, 1, 2, 3]
              
              
              rdd.map(lambda x: range(1, x)).collect()
              Output:
              [[1], [1, 2], [1, 2, 3]]
              

              【讨论】:

                【解决方案10】:

                Flatmap 和 Map 都可以转换集合。

                区别:

                地图(功能)
                通过函数 func 传递源的每个元素,返回一个新的分布式数据集。

                flatMap(func)
                类似于 map,但每个输入项可以映射到 0 个或多个输出项(因此 func 应该返回一个 Seq 而不是单个项)。

                变换函数:
                ma​​p:一个元素输入->一个元素输出。
                flatMap:一个元素输入->0个或多个元素输出(一个集合)。

                【讨论】:

                  【解决方案11】:

                  RDD.map 返回单个数组中的所有元素

                  RDD.flatMap 返回数组中的元素

                  假设我们在 text.txt 文件中有文本

                  Spark is an expressive framework
                  This text is to understand map and faltMap functions of Spark RDD
                  

                  使用地图

                  val text=sc.textFile("text.txt").map(_.split(" ")).collect
                  

                  输出:

                  text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))
                  

                  使用平面地图

                  val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect
                  

                  输出:

                   text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)
                  

                  【讨论】:

                    【解决方案12】:

                    map:它通过对RDD 的每个元素应用一个函数来返回一个新的RDD。 .map 中的函数只能返回一项。

                    flatMap:类似于map,它通过对RDD的每个元素应用一个函数返回一个新的RDD,但是输出是扁平的。

                    另外,flatMap 中的函数可以返回一个元素列表(0 个或更多)

                    例如:

                    sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()
                    

                    输出:[[1, 2], [1, 2, 3], [1, 2, 3, 4]]

                    sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()
                    

                    输出:通知 o/p 在单个列表 [1, 2, 1, 2, 3, 1、2、3、4]

                    来源:https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/

                    【讨论】:

                      【解决方案13】:

                      对于所有想要 PySpark 相关的人:

                      转换示例:flatMap

                      >>> a="hello what are you doing"
                      >>> a.split()
                      

                      ['你好','what','are','you','doing']

                      >>> b=["hello what are you doing","this is rak"]
                      >>> b.split()
                      

                      Traceback(最近一次调用最后一次): 文件“”,第 1 行,在 AttributeError: 'list' 对象没有属性 'split'

                      >>> rline=sc.parallelize(b)
                      >>> type(rline)
                      

                      >>> def fwords(x):
                      ...     return x.split()
                      
                      
                      >>> rword=rline.map(fwords)
                      >>> rword.collect()
                      

                      [['hello', 'what', 'are', 'you', 'doing'], ['this', 'is', 'rak']]

                      >>> rwordflat=rline.flatMap(fwords)
                      >>> rwordflat.collect()
                      

                      ['你好', 'what', 'are', 'you', 'doing', 'this', 'is', 'rak']

                      希望对你有帮助:)

                      【讨论】:

                        【解决方案14】:

                        地图:

                        是一种高阶方法,它接受一个函数作为输入并将其应用于源 RDD 中的每个元素。

                        http://commandstech.com/difference-between-map-and-flatmap-in-spark-what-is-map-and-flatmap-with-examples/

                        平面图:

                        采用输入函数的高阶方法和转换操​​作。

                        【讨论】:

                          【解决方案15】:

                          地图

                          通过对该 RDD 的每个元素应用一个函数来返回一个新的 RDD。

                          >>> rdd = sc.parallelize([2, 3, 4])
                          >>> sorted(rdd.map(lambda x: [(x, x), (x, x)]).collect())
                          [[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
                          

                          平面地图

                          通过首先对该 RDD 的所有元素应用一个函数,然后将结果展平,从而返回一个新的 RDD。 这里可以将一个元素转换为多个元素

                          >>> rdd = sc.parallelize([2, 3, 4])
                          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
                          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
                          

                          【讨论】:

                            【解决方案16】:
                            • map(func) 返回一个新的分布式数据集,通过声明的函数 func 传递源的每个元素形成。所以 map() 是单项

                            一会儿

                            • flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或多个输出项,因此 func 应该返回一个序列而不是单个项。

                            【讨论】:

                              【解决方案17】:

                              map和flatMap输出的区别:

                              1.flatMap

                              val a = sc.parallelize(1 to 10, 5)
                              
                              a.flatMap(1 to _).collect()
                              

                              输出:

                               1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
                              

                              2.map:

                              val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
                              
                              val b = a.map(_.length).collect()
                              

                              输出:

                              3 6 6 3 8
                              

                              【讨论】:

                                猜你喜欢
                                • 2019-04-01
                                • 2014-12-28
                                • 2011-10-05
                                • 1970-01-01
                                • 1970-01-01
                                • 1970-01-01
                                • 2015-03-26
                                • 2018-09-12
                                相关资源
                                最近更新 更多