【发布时间】:2015-07-09 07:10:38
【问题描述】:
我必须使用 Apache Spark 和 Scala 作为编程语言对数据集执行以下任务:
- 从 HDFS 读取数据集。一些示例行如下所示:
deviceid,bytes,eventdate 15590657,246620,20150630 14066921,1907,20150621 14066921,1906,20150626 6522013,2349,20150626 6522013,2525,20150613
按设备 ID 对数据进行分组。因此我们现在有了一个 deviceid => (bytes,eventdate)
的映射
对于每个设备,按事件日期对集合进行排序。我们现在有一个基于每个设备的事件日期的有序字节集。
从此有序集中挑选最近 30 天的字节。
使用时间段 30 求最后一个日期的字节移动平均值。
使用 30 的时间段找出最终日期的字节标准差。
在结果中返回两个值 (mean - kstddev) 和 (mean + kstddev) [假设 k = 3]
我使用的是 Apache Spark 1.3.0。实际的数据集更宽,最终必须在十亿行上运行。
这是数据集的数据结构。
package com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
) extends Ordered[DailyDeviceAggregates] {
def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
}
}
object DeviceAggregates {
def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
}
}
DeviceAnalyzer 类如下所示:
package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting
object DeviceAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
val sc = new SparkContext(sparkConf)
val logFile = args(0)
val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
deviceIdsMap.foreach(
// I am stuck here !!
})
sc.stop()
}
}
但我对这个算法的实际实现感到困惑。
【问题讨论】:
-
采样率是多少?一天的数据能存入内存吗?除非您具有极高的采样率,否则我不明白为什么不这样做。所以我建议只在普通 scala 中实现聚合并为它编写一些测试以确保它符合您的要求。
-
我看不出你真的被卡住了。下一步是“对于每个设备,按事件日期对集合进行排序。”。是什么阻止你做这一步?我认为您需要展示更多尝试这样做的证据,或者看起来您希望我们编写您的代码
-
你应该让问题更加集中。我不需要知道项目的每一个细节来帮助你。只需询问您需要帮助的具体问题即可。 (就像保罗说的那样。)
-
一天的数据现在可以放入内存。最终这将在一个大集群上运行,以便在代码开始工作时解决。我卡住的地方是 Scala 语法。我会试着从stackoverflow.com/questions/23402303/… 那里得到一些提示,然后今天继续。
标签: java scala hadoop apache-spark