【问题标题】:Spark with Scala : Filter RDD by values contained in another listSpark with Scala:按另一个列表中包含的值过滤 RDD
【发布时间】:2016-06-24 00:20:59
【问题描述】:

如何按 list.contains() 过滤? 这是我当前的代码,我有一个 Main 类,它从命令行参数获取输入,并根据该输入执行相应的调度程序。在这种情况下,它是一个 RecommendationDispatcher 类,它在构造函数中发挥了所有作用 - 训练模型并为输入的各种用户生成推荐:

import org.apache.commons.lang.StringUtils.indexOfAny
import java.io.{BufferedWriter, File, FileWriter}
import java.text.DecimalFormat
import Util.javaHash
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating


import org.apache.spark.{SparkConf, SparkContext}

class RecommendDispatcher(master:String, inputFile:String, outputFile:String, userList: List[String]) extends java.io.Serializable {

  val format : DecimalFormat = new DecimalFormat("0.#####");
  val file = new File(outputFile)
  val bw = new BufferedWriter(new FileWriter(file))

  val conf = new SparkConf().setAppName("Movies").setMaster(master)
  val sparkContext = new SparkContext(conf)
  val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
  val baseRdd = sparkContext.textFile(inputFile)




  val movieIds = baseRdd.map(line => line.split("\\s+")(1)).distinct().map(id => (javaHash(id), id))

  val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct()
                                        .filter(x => userList.contains(x))
                                        .map(id => (javaHash(id), id))


  val ratings = baseRdd.map(line => line.split("\\s+"))
    .map(tokens => (tokens(3),tokens(1), tokens(tokens.indexOf("review/score:")+1).toDouble))
      .map( x => Rating(javaHash(x._1),javaHash(x._2),x._3))

  // Build the recommendation model using ALS
  val rank = 10
  val numIterations = 10
  val model = ALS.train(ratings, rank, numIterations, 0.01)

  val users = userIds.collect()
  var mids = movieIds.collect()

    usrs.foreach(u => {
      bw.write("Recommendations for " + u + ":\n")
      var ranked = List[(Double, Int)]()
      mids.foreach(x => {
        val movieId = x._1
        val prediction = (model.predict(u._1, movieId), movieId)
        ranked = ranked :+ prediction
      })
      //Sort in descending order
      ranked = ranked.sortBy(x => -1 * x._1)
      ranked.foreach(x => bw.write(x._1 + " ; " + x._2 + "\n"))
    })

  bw.close()

}

这个异常被抛出在“.filter”行:

线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化

【问题讨论】:

  • userList的类型是什么?
  • @ethan 任务不可序列化异常是关闭“泄漏”的结果。调试这些类型的错误的最佳方法是使用完整的代码上下文。我建议您将整个代码放在 Databricks 社区版笔记本中,然后在此处共享链接。你可以注册accounts.cloud.databricks.com/registration.html#signup/…

标签: scala apache-spark


【解决方案1】:

我认为一个好方法是将您的userList 转换为broadcast variable

val broadcastUserList= sc.broadcast(userList)
val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct()
                                      .filter(x => broadcastUserList.value.contains(x))
                                      .map(id => (javaHash(id), id))

【讨论】:

  • 好像没有效果,结果是一样的。
  • 返回输入字符串值的哈希值:id
  • 它是怎么做到的? @Ethan
  • 以下实现不是我的:def javaHash(word: String, seed: Int = 0): Int = { var hash = 0 for (ch <- word.toCharArray) hash = 31 * hash + ch.toInt hash = hash ^ (hash >> 20) ^ (hash >> 12) hash ^ (hash >> 7) ^ (hash >> 4) }
  • @Ethan,编辑您的问题以包含修改后的代码。这个答案应该有效,所以我们需要查看仍然给出错误的代码。
【解决方案2】:

我猜 Sim 关于关闭“泄漏”是正确的,并且您提供的示例代码过于简单。

如果你的主要看起来像这样:

object test
{
  def main(args: Array[String]): Unit = 
  {
    val sc = ...
    val rdd1 = ...
    val userList = ...
    val rdd2 = rdd1.filter { list.contains( _ ) }
  } 
}

然后不会发生序列化错误。可序列化的“userList”,序列化到执行器上没有问题……

当您开始将“大”主模型建模为单独的类时,问题就开始了。

下面是一个可能出错的例子:

class FilterLogic
{
  val userList = List( 1 )  
  def filterRDD( rdd : RDD[ Int ] ) : RDD[ Int ] = 
  {
    rdd.filter { list.contains( _ ) }
  }
}

object Test 
{
  def main(args: Array[String]): Unit = 
  {
    val sc = ...
    val rdd1 = ...
    val rdd2 = new FilterLogic().filterRDD( rdd1 )// This will result in a serialization error!!!
  }
}

现在 userList 是 Logic 类的一个值,当它需要被序列化到执行器时,它要求整个包装 Logic 类也被序列化(为什么?因为在 Scala 中 userList 实际上是 Logic 中的 getter类)。

解决这个问题的几种方法:

1) userList 可以在 filterRDD 函数中创建,那么它不是 Logic 的 val(有效但限制代码共享/建模)

1.1) 类似的想法是在 filterRDD 函数中使用 temp val,如下所示:

val list_ = list ; rdd.filter { list_.contains( _ ) }

有效,但太丑了,几乎是痛苦的......

2) 逻辑类可以序列化(虽然有时可能无法序列化)

最后,使用广播可能有(或没有)它的好处,但它与序列化错误无关。

【讨论】:

    【解决方案3】:

    我尝试序列化 RecommendDispatcher 类,但仍然得到相同的异常。所以我决定将代码放在 Main 类中,这解决了我的问题。

    【讨论】:

      猜你喜欢
      • 2017-08-19
      • 1970-01-01
      • 2015-10-26
      • 1970-01-01
      • 2015-06-27
      • 1970-01-01
      • 2016-12-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多