【问题标题】:Converting Spark Scala Dataframe Column to Byte Array将 Spark Scala 数据帧列转换为字节数组
【发布时间】:2018-07-16 00:45:18
【问题描述】:

我正在尝试将 Spark Scala DataFrame Column 编写为字节数组。 我有一个由两列组成的 DataFrame。第一列是一个字符串,第二列是从 Strings 到 Longs 的 Map。

例如,

user_id | map
"ac2"   | Map("c2" -> 1, "b3" -> 5)

我想将映射列写为字节数组。到目前为止,我已经尝试将 Jackson 与以下 UDF 一起使用:

val writeJackson = udf { x: Map[String, Long] =>
    jacksonWriter.writeValueAsBytes(x)
}

val df2 = df.withColumn("jacksonMap", writeJackson($"map"))

但这失败了,因为

java.io.NotSerializableException: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer

有没有办法让它与 Jackson 一起工作,如果没有,是否有不同的库可以让我将此 Spark 列写为字节数组?

【问题讨论】:

    标签: scala apache-spark jackson


    【解决方案1】:

    我能够转换为ByteArray 并使用以下代码获取输出。使用火花 1.6.2。

    object DF {
    
      def main(args: Array[String]): Unit = {
    
        val mapper: ObjectMapper = new ObjectMapper
        mapper.registerModule(DefaultScalaModule)
    
        val df = Seq(
          ("ac2", Map("c2" -> 1, "b3" -> 5))
        ).toDF("id", "map")
    
        df.show(false)
        //output
        // +---+---------------------+
        // |id |map                  |
        // +---+---------------------+
        // |ac2|Map(c2 -> 1, b3 -> 5)|
        // +---+---------------------+
        val getByteArray = udf((map: Map[String, Int]) => mapper.writeValueAsBytes(map))
    
        df.withColumn("bytearray", getByteArray($"map")).show(false)
    
        //output
        // +---+---------------------+----------------------------------------------+
        // |id |map                  |bytearray                                     |
        // +---+---------------------+----------------------------------------------+
        // |ac2|Map(c2 -> 1, b3 -> 5)|[7B 22 63 32 22 3A 31 2C 22 62 33 22 3A 35 7D]|
        // +---+---------------------+----------------------------------------------+
      }
    }
    

    【讨论】:

    • 你如何导入ObjectMapper ?什么是DefaultScalaModule
    • 将此添加到pom.xml <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.5</version> </dependency>
    • 谢谢你。我可以使用这个 UDF 将 Spark ArrayType Column 转换为 ByteArray,而不是 Map 吗?
    • 是的,您可以将 UDF 中的类型更改为适合您的类型。
    • val convertToByteArray = udf((map: Array[String]) => mapper.writeValueAsBytes(map)) val arrayDF = Seq( ("x100", Array("p1","p2","p3")) ).toDF("id", "myarray") arrayDF.withColumn("bytearray", convertToByteArray($"myarray")).show(false) 这给我一个错误Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(anonfun$3: (array<string>) => binary)
    猜你喜欢
    • 1970-01-01
    • 2021-06-09
    • 1970-01-01
    • 2018-12-05
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多