【问题标题】:Spark: Work around nested RDDSpark:解决嵌套 RDD
【发布时间】:2015-06-09 16:04:52
【问题描述】:

有两张桌子。第一个表有两个字段book1book2 的记录。这些是通常成对一起阅读的书籍的编号。 第二个表有这些书籍的booksreaders 列,其中booksreaders 分别是书籍和读者ID。对于第二个表中的每个读者,我需要在对表中找到相应的书籍。例如,如果读者阅读书籍 1,2,3 并且我们有 (1,7)、(6,2)、(4,10) 对,则该读者的结果列表应该有书籍 7,6。

我首先按读者分组书籍,然后迭代成对。我尝试将每一本书都与用户列表中的所有书籍相匹配:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._


object Simple {

  case class Pair(book1: Int, book2: Int)
  case class Book(book: Int, reader: Int, name:String)

  val pairs = Array(
    Pair(1, 2),
    Pair(1, 3),
    Pair(5, 7)
  )

  val testRecs = Array(
    Book(book = 1, reader = 710, name = "book1"),
    Book(book = 2, reader = 710, name = "book2"),
    Book(book = 3, reader = 710, name = "book3"),
    Book(book = 8, reader = 710, name = "book8"),
    Book(book = 1, reader = 720, name = "book1"),
    Book(book = 2, reader = 720, name = "book2"),
    Book(book = 8, reader = 720, name = "book8"),
    Book(book = 3, reader = 730, name = "book3"),
    Book(book = 8, reader = 740, name = "book8")
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Simple")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val pairsDf = sc.parallelize(pairs).toDF()
    val testData = sc.parallelize(testRecs)

    // *** Group test data by reader
    val testByReader = testData.map(r => (r.reader, r.book))
    val testGroups = testByReader.groupByKey()
    val x = testGroups.map(tuple => tuple match {
      case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
    })
    x.foreach(println)
  }

  def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
    //val x = df.map(r => (r(0), r(1))) --- This also fails!!
    //x
    val relatedBooks = df.map(r => {
      val book1 = r(0)
      val book2 = r(1)
      val z = toMatch.map(book =>
        if (book == book1)
          List(book2)
        else {
          if (book == book2) List(book1)
          else List()
        } //if
      )
      z.flatMap(identity)
    })
    (user,relatedBooks)
  }
}

这会产生java.lang.NullPointerException(如下)。据我了解,Spark 不支持嵌套 RDD。请告知解决此任务的另一种方法。

...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started SocketConnector@0.0.0.0:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
[Stage 0:>                                                          (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
    at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
    at Simple$.matchList(Simple.scala:60)
    at Simple$$anonfun$2.apply(Simple.scala:52)
    at Simple$$anonfun$2.apply(Simple.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

【问题讨论】:

    标签: nested apache-spark rdd


    【解决方案1】:

    您可以创建两个 rdds 。一个用于 bookpair,一个用于 readerbook,然后通过 bookid 连接两个 rdd。

    val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
    val bookpairRdd = sc.parallelize(bookpair)
    val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
    val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
    val joinedRdd = readerRdd.join(bookpairRdd)
    joinedRdd.foreach(println)
    
    (4,(user3,6))
    (3,(user1,4))
    (2,(bar,4))
    (1,(foo,2))
    

    【讨论】:

    • 当字段完全没有名称时,join函数使用什么字段来连接两个RDD?
    • 你的意思是这对读卡器不会有 readername 吗?
    • 不,我说的是一般情况下的记录字段名称。好的,我知道了,它是按键连接的。
    【解决方案2】:

    您已经注意到,我们不能嵌套 RDD。一种选择是发出书籍-用户对,然后将其与书籍信息相结合,然后按用户 ID 对结果进行分组(按键分组有点粗略,但假设没有用户阅读了书籍信息所对应的那么多书籍该用户不适合内存,应该没问题)。

    【讨论】:

    • 不太清楚Pair(book1: Int, book2: Int)Book(book: Int, reader: Int, name:String) 的连接方式。实际上应该是book1book2 对每个读者的两个连接吗?如何组合这些连接?
    猜你喜欢
    • 1970-01-01
    • 2014-07-12
    • 1970-01-01
    • 2017-12-29
    • 1970-01-01
    • 2018-04-28
    • 2015-08-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多