【问题标题】:Create a new Spark Dataset<Row> based on an existing Dataset<Row> and an added HashMap基于现有 Dataset<Row> 和添加的 HashMap 创建新的 Spark Dataset<Row>
【发布时间】:2018-10-02 19:10:45
【问题描述】:

我有一个基于 JSON 数据的Dataset&lt;Row&gt;。现在我想基于初始数据集创建一个新的Dataset&lt;Row&gt;,但添加一个基于 Java HashMap&lt;String, String&gt; 数据类型的列,例如

Dataset<Row> dataset2 = dataset1.withColumn("newColumn", *some way to specify HashMap<String, String> as the added column's datatype*);

使用这个新数据集,我可以创建一个行编码器,例如

ExpressionEncoder<Row> dataset2Encoder = RowEncoder.apply(dataset2.schema());

然后应用映射函数,例如

dataset2 = dataset2.map(new XyzFunction(), dataset2Encoder)

澄清 我的初始数据集基于 JSON 格式的数据。我想要完成的是基于此初始数据集创建一个新数据集,但在 MapFunction 中添加了一个新列。在创建初始数据集时添加列 (withColumn) 的想法将确保我想在 MapFunction 中更新的列存在模式定义。但是,我似乎找不到修改传递给 MapFunction 类的 call(Row arg) 函数的 Row 对象的方法,或者在调用函数中使用 RowFactory.create(...) 创建一个新实例。我希望能够根据传递的 Row-object 的所有现有值和要添加到新行的新 Map 在 MapFunction 中创建一个 Row-instance。然后编码器将从生成的模式中知道这个新的/生成的列。我希望这能澄清我想要完成的工作......

【问题讨论】:

    标签: java apache-spark dataset


    【解决方案1】:

    你可以

    import static org.apache.spark.sql.functions.*;
    import org.apache.spark.sql.types.DataTypes;
    
    df.withColumn("newColumn", lit(null).cast("map<string, string>"));
    

    df.withColumn(
      "newColumn", 
      lit(null).cast(
        DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
      )
    );
    

    但是为什么要这样间接呢?

    Encoder<Row> enc = RowEncoder.apply(df.schema().add(
      "newColumn",
      DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
    ));
    

    根据您的具体操作,使用UserDefinedFunction 可能会简单得多,并且可以让您完全跳过Encoders

    【讨论】:

      猜你喜欢
      • 2020-05-20
      • 1970-01-01
      • 2020-10-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多