【问题标题】:OutOfBoundsException with ALS - Flink MLlibALS 的 OutOfBoundsException - Flink MLlib
【发布时间】:2015-09-05 12:14:36
【问题描述】:

我正在使用此处提供的 MovieLens 数据集为电影制作推荐系统: http://grouplens.org/datasets/movielens/

为了计算这个推荐系统,我在 scala 中使用了 Flink 的 ML 库,特别是 ALS 算法 (org.apache.flink.ml.recommendation.ALS)。

我首先将电影的评分映射到DataSet[(Int, Int, Double)],然后创建trainingSettestSet(参见下面的代码)。

我的问题是,当我将 ALS.fit 函数与整个数据集(所有评级)一起使用时没有错误,但如果我只删除一个评级,拟合函数不再起作用,并且我不明白为什么。

你有什么想法吗? :)

使用的代码:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

“但如果我只删除一个评分”

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

错误:

06/19/2015 15:00:24 CoGroup(在 org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) 切换到 FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

在 org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

在 org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)

在 org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

【问题讨论】:

    标签: scala indexoutofboundsexception recommendation-engine apache-flink flinkml


    【解决方案1】:

    问题在于 first 运算符与 Flink 的 ALS 实现的 setTemporaryPath 参数结合使用。为了理解问题,让我快速解释阻塞 ALS 算法的工作原理。

    交替最小二乘法的分块实现首先将给定的评分矩阵按用户和按项目划分为块。对于这些块,计算路由信息。该路由信息表示哪个用户/项目块分别从哪个项目/用户块接收哪个输入。之后,ALS 迭代开始。

    由于 Flink 的底层执行引擎是一个并行流数据流引擎,它尝试以流水线方式执行尽可能多的数据流部分。这需要让管道的​​所有运营商同时在线。这样做的好处是 Flink 避免了实现可能非常大的中间结果。缺点是可用内存必须在所有运行的算子之间共享。在 ALS 的情况下,单个 DataSet 元素(例如用户/项目块)的大小相当大,这是不希望的。

    为了解决这个问题,如果你设置了temporaryPath,并不是所有实现的操作符都会同时执行。路径定义了可以存储中间结果的位置。因此,如果您定义了一个临时路径,那么ALS 首先计算用户块的路由信息​​并将它们写入磁盘,然后计算项目块的路由信息​​并将它们写入磁盘,最后但并非最不重要它开始 ALS 迭代,它从临时路径中读取路由信息。

    用户和项目块的路由信息​​的计算都取决于给定的评分数据集。在您的情况下,当您计算用户路由信息时,它将首先读取评级数据集并对其应用first 运算符。 first 运算符从基础数据集中返回 n-arbitrary 元素。现在的问题是,Flink 没有存储这个first 操作的结果,用于计算item 路由信息。相反,当你开始计算 item 的路由信息​​时,Flink 会从源头开始重新执行数据流。这意味着它从磁盘读取评级数据集并再次对其应用first 运算符。在许多情况下,与第一次 first 操作的结果相比,这会给您一组不同的评级。所以生成的路由信息​​不一致,ALS失败。

    您可以通过具体化first 运算符的结果并将此结果用作ALS 算法的输入来规避该问题。对象FlinkMLTools 包含一个方法persist,它接受DataSet,将其写入给定路径,然后返回一个新的DataSet,它读取刚刚写入的DataSet。这允许您分解生成的数据流图。

    val firstTrainingSet : DataSet[(Int, Int, Double)] =
      ratings
        .map(r => (r.userId, r.movieId, r.rating))
        .first((ratings.count()-1).toInt)
    
    val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
    
    val als = ALS()
      .setIterations(10)
      .setNumFactors(10)
      .setBlocks(150)
      .setTemporaryPath("/tmp/tmpALS/")
    
    val parameters = ParameterMap()
      .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
      .add(ALS.Seed, 42L)
    
    als.fit(trainingSet, parameters)
    

    或者,您可以尝试不设置temporaryPath。然后以流水线方式执行所有步骤(路由信息计算和 als 迭代)。这意味着用户和项目路由信息计算都使用由first 运算符生成的相同输入数据集。

    Flink 社区目前正在努力将算子的中间结果保存在内存中。这将允许固定 first 运算符的结果,这样它就不会被计算两次,因此,由于其不确定性,不会给出不同的结果。

    【讨论】:

    • 谢谢!完美运行!只是一个问题:当您使用persist 时,您设置了文件的路径。但是ALS 怎么知道这个持久文件是给它的,而不是给程序的其他部分的(例如,如果我们必须persist 用于其他算法)?
    • persist 函数调用会触发数据流的执行到此为止。结果,存储在同一路径下的任何文件都将被覆盖。只有在您触发作业的后续部分时才会读取结果。这意味着您理论上可以同时删除或覆盖此文件。因此,您应该尝试在算法范围内分配唯一的文件名。或者,您也可以创建碰撞概率较低的随机文件名。
    猜你喜欢
    • 2017-11-03
    • 2016-11-08
    • 1970-01-01
    • 2018-07-19
    • 1970-01-01
    • 2021-05-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多