【问题标题】:Apache Spark 2.3.0 - How to merge two array<string> into one array<string>Apache Spark 2.3.0 - 如何将两个数组<string> 合并为一个数组<string>
【发布时间】:2021-11-15 14:22:56
【问题描述】:

我尝试了谷歌搜索,但无法找到解决方案。 在 2.4.x 版本中将两个 array&lt;string&gt; 合并为一个 array&lt;string&gt; 是可能且容易的,但在 2.3.0 版本中找不到方法。

输入 -

[[one, two, three], [four, five, six]]

预期输出 -

[ one, two, three, four, five, six]

谁能解释一下如何在 Spark 中使用 Java 实现这一点?

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-java


    【解决方案1】:

    由于 Spark 2.4 中引入了新的数组函数,您必须转到 user-defined function (udf)。

    java 中的用户定义函数是具有apply 方法的java 对象,可用作数据帧转换中的内置函数。要创建这样的对象,首先要创建一个UDFx 对象,其中x 是您的udf 的参数数量。

    然后您从这个UDFx 对象创建您的udf,方法是使用方法sparkSession.sqlContext().register().udf()(Spark 2.3 之前唯一可用的方法)注册它,或者使用函数udf(适用于Spark 2.3 及更高版本)创建它,如所述在this answer

    最后你将它与函数callUdf 一起使用或直接使用apply。所以 Spark 2.3 及以上版本的完整代码如下:

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.api.java.UDF1;
    import org.apache.spark.sql.expressions.UserDefinedFunction;
    import org.apache.spark.sql.types.DataTypes;
    import scala.collection.Seq;
    
    import java.util.stream.Collectors;
    
    import static org.apache.spark.sql.functions.col;
    import static org.apache.spark.sql.functions.udf;
    import static scala.collection.JavaConverters.asScalaBuffer;
    import static scala.collection.JavaConverters.seqAsJavaList;
    
    
    public class Flattener {
    
      public static Dataset<Row> flattenArray(Dataset<Row> input, String columnName) {
        // define what your user-defined function do
        UDF1<Seq<Seq<String>>, Seq<String>> flattenArray = new UDF1<Seq<Seq<String>>, Seq<String>>() {
          @Override
          public Seq<String> call(Seq<Seq<String>> s) {
            return asScalaBuffer(
              seqAsJavaList(s)
                .stream()
                .flatMap(x -> seqAsJavaList(x).stream())
                .collect(Collectors.toList())
            ).toSeq();
          }
        };
    
        // convert it to user-defined function
        UserDefinedFunction flatten_array = udf(
          flattenArray, 
          DataTypes.createArrayType(DataTypes.StringType) // output type of your UDF
        );
    
        // apply your user-defined function
        return input.withColumn(columnName, flatten_array.apply(col(columnName)));
      }
    }
    

    注意:在 java UDF 上使用序列时,您需要使用 Scala Seq 而不是 java List 作为序列输入。要从一个转换到另一个,请查看JavaConverters scala 类方法。

    然后您可以在数据框上调用 flattenArray 方法:

    Flattener.flattenArray(dataframe, "name_of_column_you_want_to_flatten");
    

    【讨论】:

      猜你喜欢
      • 2014-04-26
      • 1970-01-01
      • 2016-10-25
      • 1970-01-01
      • 2021-07-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多