【问题标题】:Spark mapPartitions closure behaviorSpark mapPartitions 关闭行为
【发布时间】:2017-11-06 08:54:42
【问题描述】:

这是一个非常常见的火花相关问题,涉及处理在哪个火花公园(执行者/驱动程序)上执行哪段代码的情况。 有了这段代码,我有点惊讶为什么我没有得到我期望的值:

1    stream
2      .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3        val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4        import argonaut.Argonaut.StringToParseWrap
5
6        val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7          val invalidCount: AtomicLong = new AtomicLong(0)
8          val convertedData: Iterator[SimpleData] = records.map(record => {
9            val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10           if (maybeData.isEmpty) {
11             logger.error("Cannot parse data from kafka: " + record.value())
12             invalidCount.incrementAndGet()
13           }
14           maybeData
15         })
16           .filter(_.isDefined)
17           .map(_.get)
18
19         val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
20         statsDClient.gauge("invalid-input-records", invalidCount.get())
21
22         convertedData
23       })
24
25       rdd.collect().length
26       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
27     })

想法:从具有无效格式(如果有)的 kafka 报告编号条目中获取 JSON 数据。 我假设当我使用 mapPartitions 方法时,里面的代码将为我拥有的每个分区执行。 IE。我希望第 7-22 行将被包装/closure-d 并发送给 executor 执行。在这种情况下,我期待

无效数据

变量将在 executor 的执行范围内,如果在 json->object 转换过程中发生错误,将被更新(第 10-13 行)。因为在内部没有 RDD 或其他东西的概念 - 只有常规条目上的常规 scala 迭代器。 在第 19-20 行,statsd 客户端向度量服务器发送 invalidData 值。 显然我总是得到 '0' 结果。

但是,如果我将代码更改为:

1     stream
2       .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3         val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4
5         // this is ugly we have to repeat it - but argonaut is NOT serializable...
6         val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7           import argonaut.Argonaut.StringToParseWrap
8            val convertedDataTest: Iterator[(Option[SimpleData], String)] = records.map(record => {
9             val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10            (maybeData, record.value())
11          })
12
13          val testInvalidDataEntries: Int = convertedDataTest.count(record => {
14            val empty = record._1.isEmpty
15            if (empty) {
16              logger.error("Cannot parse data from kafka: " + record._2)
17            }
18            empty
19          })
20          val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
21          statsDClient.gauge("invalid-input-records", testInvalidDataEntries)
22
23          convertedDataTest
24            .filter(maybeData => maybeData._1.isDefined)
25            .map(data => data._1.get)
26        })
27
28        rdd.collect().length
29        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
30      })

它按预期工作。 IE。如果我隐含地计算无效条目,我会得到期望值。

不知道为什么。想法?

可以在github找到要玩的代码

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    原因其实很简单,与 Spark 完全无关。

    查看这个 Scala 控制台示例,它根本不涉及 Spark:

    scala> val iterator: Iterator[String] = Seq("a", "b", "c").iterator
        iterator: Iterator[String] = non-empty iterator
    
    scala> val count = new java.util.concurrent.atomic.AtomicInteger(0)
        count: java.util.concurrent.atomic.AtomicInteger = 0
    
    scala> val mappedIterator = iterator.map(letter => {print("mapping!! "); count.incrementAndGet(); letter})
        mappedIterator: Iterator[String] = non-empty iterator
    
    scala> count.get
        res3: Int = 0
    

    看看我是如何从一个迭代器和一个新计数器开始的,我映射到这个迭代器上但什么也没发生:println 没有显示,计数仍然为零。

    但是当我实现mappedIterator的内容时:

    scala> mappedIterator.next
        mapping!! res1: String = a
    

    现在发生了一些事情,我得到了一个 print 和一个增强计数器。

    scala> count.get
        res2: Int = 1
    

    您在 spark 执行器上的代码中也会发生同样的情况。

    这是因为Scala iterators are lazy 相对于map 操作。 (另见herehere

    所以,在您的第一个示例中,按时间顺序发生的是:

    1. 您在原始分区迭代器上定义了一个转换(但您不执行转换本身)
    2. 你推送你的计数器变量,它处于初始状态(因为没有发生转换)
    3. 您将 Spark 传递给转换就绪的迭代器
    4. Spark 实际上会迭代此结果,因此会发生映射。但是您的副作用(第 2 步)已经执行。

    在第二种情况下,您调用 val testInvalidDataEntries: Int = convertedDataTest.count... 执行实际映射(在此过程中,计数器的增量),然后将您的计数器发送到您的服务器。

    所以,正是懒惰让您的 2 个样本表现不同。

    (这也是为什么,一般来说,从理论上讲,我们倾向于在面向函数式编程的语言中的map 操作中不产生副作用,因为结果依赖于执行顺序,而纯函数式风格应该防止这个)。

    计算失败的一种方法是使用Spark Accumulator 来累积结果并在 RDD 完成终端操作后在驱动程序端执行更新。

    【讨论】:

    • 不确定我是否在关注(第一个代码 sn-p):我假设第 6 - 23 行大括号之间的所有内容都将被包装为闭包并发送给负责相应分区的执行程序?包括 invalidCount var 定义?所以基本上整个代码 sn-p 将在相应的执行程序上为每个分区执行。没有外部参考,所以一切都应该没问题。如果 spark 做了一些“优化”——这种情况下的规则是什么?
    • 您是对的,但您的问题与 Spark 无关。这是关于您的语句的评估顺序(在您的第一个示例中调用statsDClient.gauge("invalid-input-records", invalidCount.get()) 时,迭代器的mapping 尚未发生。invalidCount 变量的值为零。我将编辑我的在顶部发帖尝试“展示”你。
    • 所以基本上如果我在第 17 行添加 .length 它应该可以工作吗?
    • 确实如此。或者实现它(.toList 第 17 行并在 mapPartition 的末尾返回 .toIterator)。
    • 谢谢。赏金很快就会到 - 在接下来的 20 小时内无法添加。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多