【问题标题】:Multiline Spark sliding windowMultiline Spark 滑动窗口
【发布时间】:2017-02-15 18:48:09
【问题描述】:

我正在学习使用 Scala 的 Apache Spark,并希望使用它来处理跨越多行的 DNA 数据集,如下所示:

ATGTAT
ACATAT
ATATAT

我想将其映射到固定大小的组 k 并计算这些组。因此,对于 k=3,我们将得到每个字符与接下来的两个字符的组:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT

...然后计算组(如字数):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1)

问题在于“单词”跨越多行,就像上面示例中的TAC 一样。它跨越换行。我不想只计算每一行中的组,而是在整个文件中,忽略行尾。

换句话说,我想将整个序列作为宽度为 k 的滑动窗口处理整个文件,就好像没有换行符一样。问题是当我到达一行末尾时,向前(或向后)查看下一个 RDD 行以完成一个窗口。

我有两个想法:

  1. 从下一行追加 k-1 个字符:
ATATATAC
ACATATAT
ATATAT

我尝试使用 Spark SQL 的 Lead() 函数,但是当我尝试执行 flatMap 时,我得到了 WindowSpec 的 NotSerializableException。有没有其他方法可以引用下一行?我需要编写自定义输入格式吗?

  1. 将整个序列读取为单行(或在读取后连接行):
ATATATACATATATATAT

有没有办法读取多行以便将它们作为一个处理?如果是这样,是否需要全部放入单台机器的内存中?

我意识到其中任何一个都可以作为预处理步骤来完成。我想知道最好的方法是在 Spark 中进行。一旦我拥有这两种格式中的任何一种,我就知道如何做剩下的了,但我被困在这里。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    你可以制作一个单个字符串的rdd而不是将它们连接成一行,因为这会使结果成为一个无法分发的字符串:

    val rdd = sc.textFile("gene.txt")
    // rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24
    

    所以只需使用flatMap 将行拆分为字符列表:

    rdd.flatMap(_.split("")).collect
    // res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T)
    

    借用this answer的更完整的解决方案:

    val rdd = sc.textFile("gene.txt")
    
    // create the sliding 3 grams for each partition and record the edges
    val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => {
      val slideList = iter.toList.sliding(3).toList
      Iterator((slideList, (slideList.head, slideList.last)))
    })
    
    // collect the edge values, concatenate edges from adjacent partitions and broadcast it
    val edgeValues = rdd1.values.collect
    
    val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => {
      (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList
    }}
    
    val sewedEdgesMap = sc.broadcast(
      (0 until rdd1.partitions.size) zip sewedEdges toMap
    )
    
    // sew the edge values back to the result
    rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))).
      flatMap(_.map(_ mkString "")).collect
    
    // res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT)
    

    【讨论】:

    • 我认为问题仍然存在:如何访问当前元素前面两个位置的元素。因此,如果我在第一个元素“A”上,我怎么能向前看,使接下来的两个组成一组:“ATG”?我知道当它在字符串或数组中时,我可以向前看并根据索引进行连接,但是 RDD 行呢?
    • 可以参考this answer
    • 谢谢,这行得通。我必须仔细阅读它才能了解发生了什么。
    猜你喜欢
    • 2019-10-21
    • 1970-01-01
    • 2019-12-28
    • 1970-01-01
    • 1970-01-01
    • 2021-04-30
    • 2016-10-07
    • 2017-04-03
    • 1970-01-01
    相关资源
    最近更新 更多