【问题标题】:Iterating an RDD and updating a mutable collection returns an empty collection迭代 RDD 并更新可变集合返回一个空集合
【发布时间】:2017-05-02 19:24:44
【问题描述】:

我是 Scala 和 Spark 的新手,希望得到一些帮助来理解为什么下面的代码没有产生我想要的结果。

我在比较两张表

我想要的输出模式是:

case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)

当我手动逐步运行以下代码时,我实际上最终得到了我想要的结果。这是一个完全填充了我想要的输出的List[DiscrepancyData]。但是,我必须在下面的代码中遗漏一些东西,因为它返回一个空列表(在调用此代码之前,还有其他代码涉及从 HIVE 读取表、映射、分组、过滤等):

val compareCols  = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)

val key = "year"

def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
    var discs: ListBuffer[DiscrepancyData] = ListBuffer()
    def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
        if (fieldOne != fieldTwo){
            DiscrepancyData(
                row1.getAs(key).toString, //fieldKey
                colName, //fieldName
                row1.getAs(colName).toString, //table1Value
                row2.getAs(colName).toString, //table2Value
                row2.getAs(colName).toString) //expectedValue
        }
        else null
    }
    def comparison() {
        for(row <- table){
            var elem1 = row._2.head //gets the first element in the iterable
            var elem2 = row._2.tail.head //gets the second element in the iterable

            for(col <- compareCols){
                var value1 = elem1.getAs(col).toString
                var value2 = elem2.getAs(col).toString

                var disc = compareFields(value1, value2, col, elem1, elem2)

                if (disc != null) discs += disc
            }
        }
    }

    comparison()

    discs.toList
}

我这样调用上面的函数:

var outcome = compare(groupedFiltered)

这是groupedFiltered中的数据:

(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))

groupedFiltered 的表架构:

(year String, 
nominal Double,
adjusted_for_inflation Double, 
average_provate_nonsupervisory_wage String)

【问题讨论】:

    标签: scala apache-spark bigdata


    【解决方案1】:

    Spark 是一个分布式计算引擎。除了经典单节点计算的“代码在做什么”之外,在 Spark 中我们还需要考虑“代码在哪里运行”

    让我们检查一下上面表达式的简化版本:

    val records: RDD[List[String]] = ??? //whatever data
    var list:mutable.List[String] = List()
    for {record <- records
         entry <- records } 
        { list += entry }
    

    scala for-comprehension 使这个表达式看起来像一个自然的本地计算,但实际上 RDD 操作被序列化并“运送”到执行器,内部操作将在本地执行。我们可以这样重写上面的代码:

    records.foreach{ record =>     //RDD.foreach => serializes closure and executes remotely
         record.foreach{entry =>   //record.foreach => local operation on the record collection
            list += entry          // this mutable list object is updated in each executor but never sent back to the driver. All updates are lost  
         }
    }
    

    可变对象在分布式计算中通常是不可行的。想象一下,一个执行者添加了一条记录,另一个执行者删除了它,正确的结果是什么?或者每个执行者都有不同的价值,哪个是正确的?

    要实现上面的操作,我们需要将数据转换成我们想要的结果。

    我将从应用另一个最佳实践开始:不要使用null 作为返回值。我还将行操作移到函数中。考虑到这一点,让我们重写比较操作:

    def compareFields(colName:String, row1:Row, row2:Row): Option[DiscrepancyData] = {
        val key = "year"
        val v1 = row1.getAs(colName).toString
        val v2 = row2.getAs(colName).toString
        if (v1 != v2){
            Some(DiscrepancyData(
                row1.getAs(key).toString, //fieldKey
                colName, //fieldName
                v1, //table1Value
                v2, //table2Value
                v2) //expectedValue
            )
        } else None
    }
    

    现在,我们可以将差异计算重写为初始 table 数据的转换:

    val discrepancies = table.flatMap{case (str, row) =>
        compareCols.flatMap{col => compareFields(col, row.next, row.next) }   
    }
    

    我们也可以使用for-comprehension 表示法,现在我们了解了运行的位置:

    val discrepancies = for {
        (str,row) <- table
        col <- compareCols
        dis <- compareFields(col, row.next, row.next)
    } yield dis
    

    请注意,discrepancies 的类型为 RDD[Discrepancy]。如果我们想将实际值传递给驱动程序,我们需要:

    val materializedDiscrepancies = discrepancies.collect()
    

    【讨论】:

    • 抱歉回复晚了!这行得通。我只需要改变一些东西。 table 已更改为 groupFiltered。和row.next 产生错误:value next is not a member of Iterable[org.apache.spark.sql.Row]... 所以我把它改成了compareFields(col, row.head, row.tail.head)
    【解决方案2】:

    遍历 RDD 并更新在循环外定义的可变结构是Spark 反模式

    想象一下这个 RDD 分布在 200 台机器上。这些机器如何更新相同的缓冲区?他们不可以。每个 JVM 都会看到自己的discs: ListBuffer[DiscrepancyData]。最后,你的结果不会是你所期望的。

    总而言之,这是一个完全有效的(虽然不是惯用的)Scala 代码,但不是有效的 Spark 代码。如果您将 RDD 替换为 Array,它将按预期工作。

    尝试按照以下思路进行功能更强大的实现:

    val finalRDD: RDD[DiscrepancyData] = table.map(???).filter(???) 
    

    【讨论】:

    • 如果我转换为数组会发生什么?一切都会在一个节点上计算吗?如果是这样,我会失去效率,对吗?
    • 是的,使用数组意味着您在 Spark 之外执行此操作,因此您失去了所有并行性。
    猜你喜欢
    • 2021-05-07
    • 1970-01-01
    • 2018-02-09
    • 1970-01-01
    • 1970-01-01
    • 2019-10-08
    • 2012-07-07
    • 2012-12-28
    相关资源
    最近更新 更多