【发布时间】:2016-03-21 11:22:39
【问题描述】:
我有一个非常简单的 (n00b) 问题,但不知何故我被卡住了。我正在尝试使用wholeTextFiles 读取Spark 中的一组文件,并希望返回RDD[LogEntry],其中LogEntry 只是一个案例类。我想得到一个有效条目的 RDD,我需要使用正则表达式来提取我的案例类的参数。当条目无效时,我不希望提取器逻辑失败,而只是在日志中写入一个条目。为此,我使用 LazyLogging。
object LogProcessors extends LazyLogging {
def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = {
val pattern = "<some pattern>".r
val logs = sc.wholeTextFiles(path, numPartitions)
val entries = logs.map(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\\r?\\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(<...>))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
这给了我一个RDD[Array[Option[LogEntry]]]。有没有一种巧妙的方法来结束LogEntrys 的 RDD?我有点想念它。
我正在考虑改用Try,但我不确定这是否更好。
非常感谢您的想法。
【问题讨论】:
-
您是否在寻找最终
RDD的签名为RDD[LongEntry]或RDD[Array[LogEntry]]或其他?
标签: scala apache-spark pattern-matching rdd