【问题标题】:Moving average of dataset in Apache Spark and ScalaApache Spark 和 Scala 中数据集的移动平均值
【发布时间】:2015-07-09 07:10:38
【问题描述】:

我必须使用 Apache Spark 和 Scala 作为编程语言对数据集执行以下任务:

  1. 从 HDFS 读取数据集。一些示例行如下所示:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
  1. 按设备 ID 对数据进行分组。因此我们现在有了一个 deviceid => (bytes,eventdate)

  2. 的映射
  3. 对于每个设备,按事件日期对集合进行排序。我们现在有一个基于每个设备的事件日期的有序字节集。

  4. 从此有序集中挑选最近 30 天的字节。

  5. 使用时间段 30 求最后一个日期的字节移动平均值。

  6. 使用 30 的时间段找出最终日期的字节标准差。

  7. 在结果中返回两个值 (mean - kstddev) 和 (mean + kstddev) [假设 k = 3]

我使用的是 Apache Spark 1.3.0。实际的数据集更宽,最终必须在十亿行上运行。

这是数据集的数据结构。

package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}

DeviceAnalyzer 类如下所示:

package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
    val sc = new SparkContext(sparkConf)
    val logFile = args(0)
    val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    deviceIdsMap.foreach(
         // I am stuck here !!
    })
    sc.stop()
  }
}

但我对这个算法的实际实现感到困惑。

【问题讨论】:

  • 采样率是多少?一天的数据能存入内存吗?除非您具有极高的采样率,否则我不明白为什么不这样做。所以我建议只在普通 scala 中实现聚合并为它编写一些测试以确保它符合您的要求。
  • 我看不出你真的被卡住了。下一步是“对于每个设备,按事件日期对集合进行排序。”。是什么阻止你做这一步?我认为您需要展示更多尝试这样做的证据,或者看起来您希望我们编写您的代码
  • 你应该让问题更加集中。我不需要知道项目的每一个细节来帮助你。只需询问您需要帮助的具体问题即可。 (就像保罗说的那样。)
  • 一天的数据现在可以放入内存。最终这将在一个大集群上运行,以便在代码开始工作时解决。我卡住的地方是 Scala 语法。我会试着从stackoverflow.com/questions/23402303/… 那里得到一些提示,然后今天继续。

标签: java scala hadoop apache-spark


【解决方案1】:

我有一个非常粗略的实现来完成这项工作,但它达不到标准。抱歉,我对 Scala/Spark 很陌生,所以我的问题非常基础。这是我现在拥有的:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      val sortedAggregates = Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
      println(sortedAggregates) // This does not work - returns an empty array !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray  // This should be sortedAggregates.map (but does not compile)
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}

如果有人可以提出以下改进建议,我将不胜感激:

  1. 对 Sorting.quicksort 的调用不起作用。也许我说错了。
  2. 我想使用 Spark mllib 类 MultivariateStatisticalSummary 来计算统计值。
  3. 为此,我需要将所有中间值保留为 RDD,以便我可以直接使用 RDD 方法来完成这项工作。
  4. 最后,我还需要将结果写入 HDFS,RDD 类为此提供了一个方法,这也是我希望将所有内容保留为 RDD 的另一个原因。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-06-17
    • 1970-01-01
    • 2019-03-08
    • 1970-01-01
    • 2020-07-29
    • 1970-01-01
    • 1970-01-01
    • 2018-04-23
    相关资源
    最近更新 更多