【问题标题】:Making predictions using h2o mojo model on spark cluster - parallelisation issues在 spark 集群上使用 h2o mojo 模型进行预测 - 并行化问题
【发布时间】:2020-02-27 18:04:57
【问题描述】:

我在 Spark 集群上使用 h2o 模型(mojo 格式)时遇到问题,但仅当我尝试并行运行它时,而不是当我使用 collect 并在驱动程序上运行它时。

由于我预测的数据帧有 > 100 个特征,我使用以下函数将数据帧行转换为 H2o 的 RowData 格式(来自here):

def rowToRowData(df: DataFrame, row: Row): RowData = {
  val rowAsMap = row.getValuesMap[Any](df.schema.fieldNames)
  val rowData = rowAsMap.foldLeft(new RowData()) { case (rd, (k,v)) =>
    if (v != null) { rd.put(k, v.toString) }
    rd
  }
  rowData
}

然后,我导入 mojo 模型并创建一个 easyPredictModel 包装器

val mojo = MojoModel.load("/path/to/mojo.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

现在,如果我先收集数据,我可以通过映射行来对我的数据框 (df) 进行预测,因此以下工作:

val predictions = df.collect().map { r =>
  val rData = rowToRowData(df, r) . // convert row to RowData using function
  val prediction = easyModel.predictBinomial(rData).label
  (r.getAs[String]("id"), prediction.toInt)
  }
  .toSeq
  .toDF("id", "prediction")

但是,我希望在集群上并行执行此操作,因为最终的 df 太大而无法在驱动程序上收集。但是,如果我尝试在不先收集的情况下运行相同的代码:

val predictions = df.map { r =>
  val rData = rowToRowData(df, r)
  val prediction = easyModel.predictBinomial(rData).label
  (r.getAs[String]("id"), prediction.toInt)
}
  .toDF("id", "prediction")

我收到以下错误:

18/01/03 11:34:59 WARN TaskSetManager: Lost task 0.0 in stage 118.0 (TID 9914, 213.248.241.182, executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

所以它看起来像数据类型不匹配。我尝试先将数据框转换为 rdd(即df.rdd.map,但得到相同的错误),执行df.mapPartition,或将rowToData 功能代码放在地图中,但到目前为止没有任何效果。

关于实现这一目标的最佳方法的任何想法?

【问题讨论】:

    标签: scala apache-spark h2o


    【解决方案1】:

    我发现一些凌乱的 Spark 票证https://issues.apache.org/jira/browse/SPARK-18075 描述了与提交 Spark 应用程序的不同方式相关的相同问题。看看吧,也许它会给你一个关于你的问题的线索。

    【讨论】:

    • 感谢 Dmitry,但是当我尝试返回一行时,我得到了编译错误:Error:(344, 53) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._Error:(344, 53) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.Row])org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]. Unspecified value parameter evidence$6.]
    • 有趣...所以问题不在于返回 Row,您的代码应该可以正常返回元组。我发现一些凌乱的 Spark 票证描述了与提交 Spark 应用程序的不同方式相关的相同问题issues.apache.org/jira/browse/SPARK-18075。看看吧,也许它会给你一个关于你的问题的线索。
    • 嗨@Dmitry - 实际上,返回一个元组完全符合我的目的,因为我可以通过.toDF 调用将其直接转换为DataFrame!谢谢您,如果您编辑游览原始答案,那么我可以接受它作为正确答案。
    【解决方案2】:

    您不能调用 prediction.toInt。返回的预测是一个元组。您需要提取该元组的第二个元素以获得 1 级的实际分数。我在这里有一个完整的示例:https://stackoverflow.com/a/47898040/9120484

    【讨论】:

    • 感谢您的链接。我确实看到了你的答案 - 但我需要一些不涉及预先指定行名和类型的东西。在这种情况下,您可以调用prediction.toInt,因为预测结果不是概率元组,而是标签,或者.predictBinomial(rData).label的结果。
    猜你喜欢
    • 2018-05-29
    • 2019-04-24
    • 2020-03-18
    • 2021-06-08
    • 2020-11-20
    • 2019-07-20
    • 1970-01-01
    • 2017-08-10
    • 2018-08-30
    相关资源
    最近更新 更多