【发布时间】:2017-11-06 08:54:42
【问题描述】:
这是一个非常常见的火花相关问题,涉及处理在哪个火花公园(执行者/驱动程序)上执行哪段代码的情况。 有了这段代码,我有点惊讶为什么我没有得到我期望的值:
1 stream
2 .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3 val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4 import argonaut.Argonaut.StringToParseWrap
5
6 val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7 val invalidCount: AtomicLong = new AtomicLong(0)
8 val convertedData: Iterator[SimpleData] = records.map(record => {
9 val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10 if (maybeData.isEmpty) {
11 logger.error("Cannot parse data from kafka: " + record.value())
12 invalidCount.incrementAndGet()
13 }
14 maybeData
15 })
16 .filter(_.isDefined)
17 .map(_.get)
18
19 val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
20 statsDClient.gauge("invalid-input-records", invalidCount.get())
21
22 convertedData
23 })
24
25 rdd.collect().length
26 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
27 })
想法:从具有无效格式(如果有)的 kafka 报告编号条目中获取 JSON 数据。 我假设当我使用 mapPartitions 方法时,里面的代码将为我拥有的每个分区执行。 IE。我希望第 7-22 行将被包装/closure-d 并发送给 executor 执行。在这种情况下,我期待
无效数据
变量将在 executor 的执行范围内,如果在 json->object 转换过程中发生错误,将被更新(第 10-13 行)。因为在内部没有 RDD 或其他东西的概念 - 只有常规条目上的常规 scala 迭代器。 在第 19-20 行,statsd 客户端向度量服务器发送 invalidData 值。 显然我总是得到 '0' 结果。
但是,如果我将代码更改为:
1 stream
2 .foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
3 val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
4
5 // this is ugly we have to repeat it - but argonaut is NOT serializable...
6 val rdd: RDD[SimpleData] = kafkaRdd.mapPartitions((records: Iterator[ConsumerRecord[String, String]]) => {
7 import argonaut.Argonaut.StringToParseWrap
8 val convertedDataTest: Iterator[(Option[SimpleData], String)] = records.map(record => {
9 val maybeData: Option[SimpleData] = record.value().decodeOption[SimpleData]
10 (maybeData, record.value())
11 })
12
13 val testInvalidDataEntries: Int = convertedDataTest.count(record => {
14 val empty = record._1.isEmpty
15 if (empty) {
16 logger.error("Cannot parse data from kafka: " + record._2)
17 }
18 empty
19 })
20 val statsDClient = new NonBlockingStatsDClient("appName", "monitoring.host", 8125) // I know it should be a singleton :)
21 statsDClient.gauge("invalid-input-records", testInvalidDataEntries)
22
23 convertedDataTest
24 .filter(maybeData => maybeData._1.isDefined)
25 .map(data => data._1.get)
26 })
27
28 rdd.collect().length
29 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
30 })
它按预期工作。 IE。如果我隐含地计算无效条目,我会得到期望值。
不知道为什么。想法?
可以在github找到要玩的代码
【问题讨论】:
标签: apache-spark spark-streaming