【问题标题】:How to create a custom Encoder in Spark 2.X Datasets?如何在 Spark 2.X 数据集中创建自定义编码器?
【发布时间】:2016-10-08 22:52:30
【问题描述】:

Spark 数据集从 Row's 移到 Encoder's for Pojo's/primitives。 Catalyst 引擎使用ExpressionEncoder 来转换 SQL 表达式中的列。然而,似乎没有Encoder 的其他子类可用作我们自己实现的模板。

以下是 Spark 1.X / DataFrames 中的代码示例,但在新机制中无法编译:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

我们得到一个编译器错误

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

那么在某种程度上/某处应该有办法

  • 定义/实现我们的自定义编码器
  • DataFrame(现在是Row 类型的数据集)上执行映射时应用它
  • 注册编码器以供其他自定义代码使用

我正在寻找能够成功执行这些步骤的代码。

【问题讨论】:

标签: scala apache-spark apache-spark-dataset apache-spark-encoders


【解决方案1】:

您是否导入了隐式编码器?

导入 spark.implicits._

http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder

【讨论】:

    【解决方案2】:

    据我所知,自 1.6 以来并没有真正改变,How to store custom objects in Dataset? 中描述的解决方案是唯一可用的选项。不过,您当前的代码应该可以与产品类型的默认编码器一起正常工作。

    要了解为什么您的代码在 1.x 中有效而在 2.0.0 中可能无效,您必须检查签名。在 1.x 中,DataFrame.map 是一种采用函数 Row => T 并将 RDD[Row] 转换为 RDD[T] 的方法。

    在 2.0.0 中,DataFrame.map 也采用 Row => T 类型的函数,但将 Dataset[Row](又名 DataFrame)转换为 Dataset[T],因此 T 需要 Encoder。如果您想获得“旧”行为,您应该明确使用RDD

    df.rdd.map(row => ???)
    

    对于Dataset[Row]map,请参阅Encoder error while trying to map dataframe row to updated row

    【讨论】:

      【解决方案3】:

      我导入了 spark.implicits._ spark 是 SparkSession,它解决了错误并导入了自定义编码器。

      另外,编写自定义编码器是一种我没有尝试过的方法。

      工作解决方案:- 创建 SparkSession 并导入以下内容

      导入 spark.implicits._

      【讨论】:

        猜你喜欢
        • 2020-08-14
        • 1970-01-01
        • 1970-01-01
        • 2014-03-07
        • 2018-04-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-07-29
        相关资源
        最近更新 更多