【问题标题】:Convert Array[(String,String)] type to RDD[(String,String)] type in spark在 spark 中将 Array[(String,String)] 类型转换为 RDD[(String,String)] 类型
【发布时间】:2017-01-29 15:45:53
【问题描述】:

我是火花新手。

这是我的代码:

val Data = sc.parallelize(List(
      ("I", "India"), 
      ("U", "USA"), 
      ("W", "West"))) 

val DataArray = sc.broadcast(Data.collect)

val FinalData = DataArray.value

这里的FinalDataArray[(String, String)] 类型。 但我希望数据采用RDD[(String, String)] 类型的形式。

我可以将FinalData 转换为RDD[(String, String)] 类型吗?

更多详情:

我想加入两个RDD所以优化加入条件(从性能角度来看) 我正在向所有集群广播小型 RDD,以便减少数据混洗。(间接性能会得到改善) 所以对于这一切,我正在写这样的东西:

//Big Data
val FirstRDD = sc.parallelize(List(****Data of first table****))

//Small Data
val SecondRDD = sc.parallelize(List(****Data of Second table****)) 

所以我肯定会广播小数据集(意味着 SecondRDD)

val DataArray = sc.broadcast(SecondRDD.collect)

val FinalData = DataArray.value

//这里会报错

val Join = FirstRDD.leftOuterJoin(FinalData)

找到数组需要的RDD

这就是我寻找 Array 到 RDD 转换的原因。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    真正的问题是您正在通过收集RDD 创建一个Broadcast 变量(请注意,此操作会将RDD 转换为Array)。所以,我的意思是你已经有一个RDD,也就是Data,这个变量的值和FinalData完全一样,但是是你想要的RDD[(String, String)]

    您可以在以下输出中检查这一点。

    Data: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:32
    DataArray: org.apache.spark.broadcast.Broadcast[Array[(String, String)]] = Broadcast(1)
    FinalData: Array[(String, String)] = Array((I,India), (U,USA), (W,West))
    

    虽然,我不明白你的方法你只需要并行化Broadcast的值。

    // You already have this data stored in `Data`, so it's useless repeat this process.
    val DataCopy = sc.parallelize(DataArray.value)
    

    编辑

    再次阅读您的问题后,我相信问题几乎相同。您正在尝试 joinRDDBroadcast,这是不允许的。但是,如果您阅读文档,您可能会注意到joinRDDs 都是可能的(参见下面的代码)。

    val joinRDD = FirstRDD.keyBy(_._1).join(SecondRDD.keyBy(_._1))
    

    【讨论】:

    • 但广播后我必须再次将 FinalData 转换为 RDD。由于 FinalData 是 Array 类型。
    • @Darshan 也许您没有注意到Data 具有RDD[(String, String)] 类型的事实——这正是您所寻找的...
    • 我想使用一个功能。但是这个函数在 RDD 中而不是在 Array 中。这就是为什么
    • @Tzach Zohar 是的 Data 有类型 RDD[(String, String)]
    • 因此您有意希望在驱动程序应用程序和集群节点之间来回移动数据 4 次,最终得到一个 相同 值?
    【解决方案2】:

    广播确实有助于提高大型 RDD 和较小 RDD 之间的 JOIN 性能。当您这样做时,广播(连同mapmapPartitions替换 连接,它不会在 连接中使用,因此您绝不会需要“将广播转换为 RDD”。

    它的外观如下:

    val largeRDD = sc.parallelize(List(
      ("I", "India"),
      ("U", "USA"),
      ("W", "West")))
    
    val smallRDD = sc.parallelize(List(
      ("I", 34),
      ("U", 45)))
    
    val smaller = sc.broadcast(smallRDD.collectAsMap())
    
    // using "smaller.value" inside the function passed to RDD.map ->
    // on executor side. Broadcast made sure it's copied to each executor (once!)
    val joinResult = largeRDD.map { case (k, v) => (k, v, smaller.value.get(k)) }
    
    joinResult.foreach(println)
    // prints:
    // (I,India,Some(34))
    // (W,West,None)
    // (U,USA,Some(45))
    

    查看类似的解决方案(使用mapPartitions),这可能更有效here

    【讨论】:

      猜你喜欢
      • 2020-08-17
      • 2021-09-28
      • 1970-01-01
      • 2015-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-10
      • 2018-11-26
      相关资源
      最近更新 更多