【问题标题】:ReduceByKey + Map + Seq explanationReduceByKey + Map + Seq 解释
【发布时间】:2017-04-04 10:41:47
【问题描述】:

我正在试图弄清楚 reduceByKey 是如何运作的,但这个案例让我很困惑,我根本无法理解。

代码是:

 stream.foreachRDD((rdd: RDD[Record]) => {
      // convert string to PoJo and generate rows as tuple group
    val pairs = rdd
            .map(row => (row.timestamp(), jsonDecode(row.value())))
            .map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))
    val flatten = pairs
                .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
                .map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))

想象一下数据收入: ["大洋洲", 500], ["澳大利亚", 450] 等。

flatten 变量中,我尝试按市场类型或 JSON 中的第一种类型聚合数据。这是生成元组:*第一个是计数器值,这个值为1, * 第二个是从 Kafka 收到的费率, * 第三个是活动时间。例如2017-05-12 16:00:00 * * 在地图中, * 方法f._1 是市场名称, * 我们将总费率除以总项目数f._2._2 / f._2._1 * 如您所见,f._2._3 是平均事件时间

有人能帮我解释一下f._2._3 是什么意思吗(我的意思是我知道它的临时变量,但里面有什么或可能有什么)以及如何通过除以f._2._2 / f._2._1 来计算总速率,究竟是什么?谢谢你:)

【问题讨论】:

  • 也许是时候做一些代表JSON结构的case classes,然后你就可以自己回答你的问题了:)
  • 好的,现在我有一个 struct 有 3 个文件。您是在建议使 case class 类似于 struct aaa 并用对象变量替换 f_2._2 等?
  • 如果你有带有命名字段的结构,它的可读性会更高,你不会有这样的问题;)
  • 嗯,我阅读这段代码没有问题,我知道每个f 的含义也是上面提到的。我的问题是理解让我称之为多维reduceByKey 以及它在这个例子中是如何工作的。我阅读了几十篇关于该方法的文章,但仍然无法整理出一些东西,这就是我在这里寻求帮助的原因:)

标签: scala apache-spark spark-streaming scala-collections


【解决方案1】:

对于每一行,您在 RDD pairs 中定义以下元素:

(marketType, (counter, rate, eventTime))

请注意,这是一个Tuple2,其第二个元素是Tuple3Tuples 是特殊情况类,其n-th 元素(从1 开始)被命名为_n。例如,要访问元素frate,您必须执行f._2._2Tuple3 的第二个元素,它是Tuple2 的第二个元素)。

由于您的元素具有特殊含义,您可能需要考虑定义一个案例类MyRow(counter: Int, rate: Int, time: Timestamp),以便在编写f._2._3 之类的内容时更清楚地了解您的代码在做什么(顺便说一下,我不清楚eventTime 的类型,因为您只将其表示为String,但您对其进行了数值运算)。

现在看看你的代码真正想要做什么:

归约函数需要两个 Tuple3(或 MyRow,如果您更改代码)并输出另一个(在这里,您的归约函数对计数器、速率求和,并在事件时间)。

reduceByKey 只要找到具有相同键的两个元素就应用此归约函数:由于归约函数的输出与其输入的类型相同,因此可以对其应用它,只要您有您的 RDD 上具有相同键的其他值。

举个简单的例子,如果你有

(key1, (1, 200, 2017/04/04 12:00:00))
(key1, (1, 300, 2017/04/04 12:00:00))
(key1, (1, 500, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))

然后reduceByKey会输出

(key1, (3, 1000, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))

然后你的最后一个map 将通过计算总费率来解决这个问题:

(key1, (333, 2017/04/04 12:00:00))
(key2, (500, 2017/04/04 12:00:00))

您可能已经注意到,我在所有示例中总是使用相同的时间。这是因为您在该字段上的归约函数会产生意想不到的结果,因为它不是关联。尝试执行与上述相同的练习,但使用不同的时间戳,您会发现 key1 的减少值将根据您应用减少的顺序而有所不同。

让我们看看:我们想用这个函数减少 4、8 和 16,所以我们可能想这样做

((4 + 8) / 2 + 16) / 2

或作为

(4 + (8 + 16) / 2) / 2

取决于我们是从左边还是右边开始(在实际情况下,还有更多不同的可能性,它们会在 Spark 中发生,因为您并不总是知道您的值是如何分布在集群)。

计算上面的两种可能性,我们得到不同的值:118,所以你会发现这在现实生活中可能会导致更大的问题。

在您的情况下,一个简单的解决方案是同时计算所有时间戳的总和(假设它们是 Long 值,甚至是 BigInteger,以避免溢出),最后只除以值的数量获得实时平均值。

【讨论】:

  • 这太棒了,我欠你很多时间 ^^ 你向我解释了我需要的一切,进一步datetimeTimeStampType 中,所以如果你对此有任何建议.非常感谢,非常感谢:)
  • 处理时间戳的最简单方法是使用getMillis 或同等名称,将时间戳更改为Long
猜你喜欢
  • 1970-01-01
  • 2016-02-12
  • 1970-01-01
  • 2014-08-25
  • 2020-07-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-08-26
相关资源
最近更新 更多