【问题标题】:How to extract data from Spark MLlib FP Growth model如何从 Spark MLlib FP 增长模型中提取数据
【发布时间】:2015-11-02 17:25:46
【问题描述】:

我在独立模式下运行 spark master 和 slave,没有 Hadoop 集群。 使用 spark-shell,我可以使用我的数据快速构建 FPGrowthModel。 构建模型后,我将尝试查看模型中捕获的模式和频率,但 spark 挂在具有更大数据集(200000 * 2000 矩阵等数据)的 collect() 方法(通过查看 Spark UI)上。 这是我在 spark-shell 中运行的代码:

import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD

val textFile = sc.textFile("/path/to/txt/file")
val data = textFile.map(_.split(" ")).cache()

val fpg = new FPGrowth().setMinSupport(0.9).setNumPartitions(8)
val model = fpg.run(data)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

我尝试将 spark shell 内存从 512MB 增加到 2GB,但似乎并没有缓解挂起问题。我不确定是因为需要 Hadoop 才能执行此任务,还是我需要更多地增加 spark-shell 内存或其他原因。

15/08/10 22:19:40 ERROR TaskSchedulerImpl: Lost executor 0 on 142.103.22.23: remote Rpc client disassociated
15/08/10 22:19:40 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@142.103.22.23:43440] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/0 is now EXITED (Command exited with code 137)
15/08/10 22:19:40 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 4.0
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Executor app-20150810163957-0001/0 removed: Command exited with code 137
15/08/10 22:19:40 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 59, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 62, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 56, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 58, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 61, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 60, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 7.0 in stage 4.0 (TID 63, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 57, 142.103.22.23): ExecutorLostFailure (executor 0 lost)
15/08/10 22:19:40 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor added: app-20150810163957-0001/1 on worker-20150810163259-142.103.22.23-48853 (142.103.22.23:48853) with 8 cores
15/08/10 22:19:40 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150810163957-0001/1 on hostPort 142.103.22.23:48853 with 8 cores, 15.0 GB RAM
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now LOADING
15/08/10 22:19:40 INFO DAGScheduler: Executor lost: 0 (epoch 2)
15/08/10 22:19:40 INFO AppClient$ClientActor: Executor updated: app-20150810163957-0001/1 is now RUNNING
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
15/08/10 22:19:40 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 142.103.22.23, 37411)
15/08/10 22:19:40 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
15/08/10 22:19:40 INFO ShuffleMapStage: ShuffleMapStage 3 is now unavailable on executor 0 (0/16, false)

【问题讨论】:

  • 您是否收到错误消息?还是直接挂断?

标签: hadoop apache-spark apache-spark-mllib


【解决方案1】:

如果数据集很大,你不应该运行 .collect(),比如它有几个 GB,你不应该使用它,它有助于加快执行多个评估的速度。 在不收集的情况下运行 foreach 循环。

【讨论】:

  • 您好,感谢您的帮助,我从代码中删除了 collect() 方法,并在 Spark UI 标准输出部分找到了打印输出,这是向前迈出的一步!但是,6 小时后,控制台中出现了一些错误消息,我无法判断模型是否已完成对所有项目的迭代。我已将它们附加在原始问题中。谢谢!
  • 你可以先用一个较小的数据集试试,看看能不能用,然后再判断是资源问题还是什么。
  • 谢谢!我觉得我的模型太大了,我过滤了物品,情况处理好了
【解决方案2】:

Kryo 是比 org.apache.spark.serializer.JavaSerializer 更快的序列化程序。一种可能的解决方法是告诉 spark 不要使用 Kryo:

val conf = (new org.apache.spark.SparkConf()
.setAppName("APP_NAME")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

然后尝试再次运行上面的代码。

请参阅此链接以供参考:

FPGrowth Algorithm in Spark

【讨论】:

  • 请不要对多个问题发布完全相同的答案:它要么不适合所有人,要么问题是重复的,应该这样标记/关闭。
  • 感谢两位 cmets...我将编辑答案以包含基本部分并提供参考链接。
【解决方案3】:

尝试将collect() 替换为local iterator。最终,您可能会遇到 FPGrowth 实现的限制。请参阅我的帖子 hereSpark JIRA issue

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-03-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多