【问题标题】:Pattern matching and RDDs模式匹配和 RDD
【发布时间】:2016-03-21 11:22:39
【问题描述】:

我有一个非常简单的 (n00b) 问题,但不知何故我被卡住了。我正在尝试使用wholeTextFiles 读取Spark 中的一组文件,并希望返回RDD[LogEntry],其中LogEntry 只是一个案例类。我想得到一个有效条目的 RDD,我需要使用正则表达式来提取我的案例类的参数。当条目无效时,我不希望提取器逻辑失败,而只是在日志中写入一个条目。为此,我使用 LazyLogging。

object LogProcessors extends LazyLogging {

  def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = {

    val pattern = "<some pattern>".r

    val logs = sc.wholeTextFiles(path, numPartitions)
    val entries = logs.map(fileContent => {
      val file = fileContent._1
      val content = fileContent._2
      content.split("\\r?\\n").map(line => line match {
        case pattern(dt, ev, seq) => Some(LogEntry(<...>))
        case _ => logger.error(s"Cannot parse $file: $line"); None
      })
    })

这给了我一个RDD[Array[Option[LogEntry]]]。有没有一种巧妙的方法来结束LogEntrys 的 RDD?我有点想念它。

我正在考虑改用Try,但我不确定这是否更好。

非常感谢您的想法。

【问题讨论】:

  • 您是否在寻找最终RDD 的签名为RDD[LongEntry]RDD[Array[LogEntry]] 或其他?

标签: scala apache-spark pattern-matching rdd


【解决方案1】:
  1. 要摆脱Array - 只需将map 命令替换为flatMap - flatMap 会将每个记录的Traversable[T] 类型的结果视为T 类型的单独记录。

  2. 要摆脱 Option - collect 只有成功的:entries.collect { case Some(entry) =&gt; entry }。 请注意,collect(p: PartialFunction) 重载(执行与 mapfilter 组合等效的操作)与 collect()(将所有数据发送到驱动程序)非常不同。

总而言之,这将是这样的:

def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[CleaningLogEntry] = {

  val pattern = "<some pattern>".r

  val logs = sc.wholeTextFiles(path, numPartitions)
  val entries = logs.flatMap(fileContent => {
    val file = fileContent._1
    val content = fileContent._2
    content.split("\\r?\\n").map(line => line match {
      case pattern(dt, ev, seq) => Some(LogEntry(<...>))
      case _ => logger.error(s"Cannot parse $file: $line"); None
    })
  })

  entries.collect { case Some(entry) => entry }
}

【讨论】:

  • 太好了,谢谢!我知道这很简单,但不知何故我的大脑今天不合作。 entries.collect()(在您的解决方案中)和 entries.flatMap(e =&gt; e) 之间有区别吗?
  • 我相信没有 - flatMap 可以工作,因为 Option 被隐式转换为 Seq 有 0 或 1 条记录,然后 flatMap 只接受这些记录(如果它们存在的话).. 所以我猜这两个会功能完全一样。
猜你喜欢
  • 2016-03-06
  • 2022-01-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-02-11
  • 1970-01-01
相关资源
最近更新 更多