【发布时间】:2017-04-04 10:41:47
【问题描述】:
我正在试图弄清楚 reduceByKey 是如何运作的,但这个案例让我很困惑,我根本无法理解。
代码是:
stream.foreachRDD((rdd: RDD[Record]) => {
// convert string to PoJo and generate rows as tuple group
val pairs = rdd
.map(row => (row.timestamp(), jsonDecode(row.value())))
.map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))
val flatten = pairs
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
.map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))
想象一下数据收入: ["大洋洲", 500], ["澳大利亚", 450] 等。
在 flatten 变量中,我尝试按市场类型或 JSON 中的第一种类型聚合数据。这是生成元组:*第一个是计数器值,这个值为1,
* 第二个是从 Kafka 收到的费率,
* 第三个是活动时间。例如2017-05-12 16:00:00
*
* 在地图中,
* 方法f._1 是市场名称,
* 我们将总费率除以总项目数f._2._2 / f._2._1
* 如您所见,f._2._3 是平均事件时间
有人能帮我解释一下f._2._3 是什么意思吗(我的意思是我知道它的临时变量,但里面有什么或可能有什么)以及如何通过除以f._2._2 / f._2._1 来计算总速率,究竟是什么?谢谢你:)
【问题讨论】:
-
也许是时候做一些代表JSON结构的
case classes,然后你就可以自己回答你的问题了:) -
好的,现在我有一个 struct 有 3 个文件。您是在建议使 case class 类似于 struct aaa 并用对象变量替换
f_2._2等? -
如果你有带有命名字段的结构,它的可读性会更高,你不会有这样的问题;)
-
嗯,我阅读这段代码没有问题,我知道每个
f的含义也是上面提到的。我的问题是理解让我称之为多维reduceByKey以及它在这个例子中是如何工作的。我阅读了几十篇关于该方法的文章,但仍然无法整理出一些东西,这就是我在这里寻求帮助的原因:)
标签: scala apache-spark spark-streaming scala-collections