【问题标题】:Spark binary column split into multiple columnsSpark二进制列拆分为多列
【发布时间】:2021-05-02 15:28:11
【问题描述】:

我正在编写一个 Java 应用程序。我有一个火花Dataset<MyObject>,它产生一个二进制类型的列:

Dataset<MyObject> dataset = sparkSession.createDataset(someRDD, Encoders.javaSerialization(MyObject.class));
dataset.printSchema();

//root
//|-- value: binary (nullable = true)

MyObject 有不同的(嵌套)字段,我想在我的数据集中的多个列中“分解”它们。新列还需要根据MyObject 中的多个属性计算。作为解决方案,我可以使用 .withColumn() 并应用 UDF。不幸的是,我不知道如何在 UDF 中接受二进制类型,然后将其转换为 MyObject。关于如何做到这一点的任何建议?

【问题讨论】:

  • 二进制表示为字节数组,您可以尝试使用 byte[] 作为 UDF 的输入类型。有关如何从 UDF 返回复杂类型的信息,请参阅此 post

标签: java apache-spark apache-spark-dataset


【解决方案1】:

感谢blackbishop的建议,我解决了。这是完整的解决方案:

您需要注册 UDF:

UDFRegistration udfRegistration = sparkSession.sqlContext().udf();
udfRegistration.register("extractSomeLong", extractSomeLong(), DataTypes.LongType);

声明并实施 UDF。第一个参数必须是 byte[] 并且您需要将字节数组转换为您的对象,如下所示:

private static UDF1<byte[], Long> extractSomeLong() {
    return (byteArray) -> {
        if (byteArray != null) {
            ByteArrayInputStream in = new ByteArrayInputStream(byteArray);
            ObjectInputStream is = new ObjectInputStream(in);
            MyObject traceWritable = (MyObject) is.readObject();
            return traceWritable.getSomeLong();
        }
        else {
            return -1L;
        }
    };
}

最后它可以用于:

Dataset<MyObject> data = sparkSession.createDataset(someRDD, Encoders.javaSerialization(MyObject.class));
Dataset<Row> processedData = data.withColumn( "ID", functions.callUDF( "extractSomeLong", new Column("columnName")))

【讨论】:

  • 如果需要提取多个字段,我认为可以返回 map 或 struct 类型。比使用特定的 UDF 提取每个更好:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-03-11
  • 2020-04-24
  • 1970-01-01
  • 2015-07-11
  • 2017-01-07
相关资源
最近更新 更多