【问题标题】:Spark - Java UDF returning multiple columnsSpark - 返回多列的 Java UDF
【发布时间】:2017-02-05 17:55:14
【问题描述】:

我正在使用 sparkSql 1.6.2 (Java API),我必须处理以下 DataFrame,它在 2 列中有一个值列表:

ID  AttributeName AttributeValue
 0  [an1,an2,an3] [av1,av2,av3]
 1  [bn1,bn2]     [bv1,bv2]

想要的表是:

ID  AttributeName AttributeValue
 0  an1           av1
 0  an2           av2
 0  an3           av3
 1  bn1           bv1
 1  bn2           bv2

我想我必须结合使用explode 函数和自定义UDF 函数。

我找到了以下资源:

我可以成功运行一个读取两列并返回列中前两个字符串的串联的示例

 UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
        public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            return col1.apply(0) + col2.apply(0);
        }
    };

 context.udf().register("combineUDF", combineUDF, DataTypes.StringType);

问题是编写返回两列的 UDF 的签名(在 Java 中)。 据我了解,我必须定义一个新的 StructType 如下所示并将其设置为返回类型,但到目前为止我还没有设法让最终代码正常工作

StructType retSchema = new StructType(new StructField[]{
            new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
            new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
        }
    );

context.udf().register("combineUDF", combineUDF, retSchema);

任何帮助将不胜感激。

更新:我正在尝试首先实现 zip(AttributeName,AttributeValue),所以我只需要在 sparkSql 中应用标准的 explode 函数:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

我构建了以下 UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
        public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            List<List<String>> zipped = new LinkedList<>();

            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
                zipped.add(subRow);
            }

            return zipped;
        }

    };

但是当我运行代码时

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);

我收到以下错误消息:

scala.MatchError: [[an1,av1],[an1,av2],[an3,av3]](属于 java.util.LinkedList 类)

看起来组合已正确执行,但返回类型不是 Scala 中的预期类型。

有什么帮助吗?

【问题讨论】:

    标签: java apache-spark apache-spark-sql udf


    【解决方案1】:

    最后我设法得到了我正在寻找的结果,但可能不是以最有效的方式。

    基本上是两步:

    • 两个列表的邮编
    • 按行展开列表

    第一步,我定义了以下 UDF 函数

    UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
        public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            ArrayList zipped = new ArrayList();
    
            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                String subRow = col1.apply(i) + ";" + col2.apply(i);
                zipped.add(subRow);
            }
    
            return scala.collection.JavaConversions.asScalaBuffer(zipped);
        }
    
    };
    

    缺少对 SparkSession 的函数注册:

    sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
    

    然后我用以下代码调用它:

    DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
    

    在这个阶段,df2 看起来像这样:

    ID  AttName_AttValue
     0  [[an1,av1],[an1,av2],[an3,av3]]
     1  [[bn1,bv1],[bn2,bv2]]
    

    然后我调用了以下 lambda 函数来将列表分解为行:

     DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));
    

    在这个阶段,df3 看起来像这样:

    ID  AttName_AttValue
     0  [an1,av1]
     0  [an1,av2]
     0  [an3,av3]
     1  [bn1,bv1]
     1  [bn2,bv2]
    

    最后为了将属性名和值拆分成两个不同的列,我将DataFrame转换成JavaRDD以便使用map函数:

    JavaRDD df3RDD = df3.toJavaRDD().map(
                (Function<Row, Row>) myRow -> {
                    String[] info = String.valueOf(myRow.get(1)).split(",");
                    return RowFactory.create(myRow.get(0), info[0], info[1]);
            }).cache();
    

    如果有人有更好的解决方案,请随时发表评论。 希望对你有帮助。

    【讨论】:

    • 我已经尝试过您的示例并得到:未定义的函数:'myNewFunc'。此函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。
    猜你喜欢
    • 2017-05-11
    • 1970-01-01
    • 1970-01-01
    • 2017-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-17
    相关资源
    最近更新 更多