【问题标题】:Encoder for Row Type Spark Datasets行类型 Spark 数据集的编码器
【发布时间】:2017-08-31 12:15:42
【问题描述】:

我想为 DataSet 中的 Row 类型编写一个编码器,用于我正在执行的映射操作。本质上,我不明白如何编写编码器。

以下是地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());

我明白编码器不是字符串,需要写成如下:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };

但是,我不了解编码器中的 clsTag(),我正在尝试找到一个可以演示类似内容的运行示例(即行类型的编码器)

编辑 - 这不是提到的问题的副本:Encoder error while trying to map dataframe row to updated row 因为答案谈到在 Spark 2.x 中使用 Spark 1.x(我没有这样做),我也在寻找一个编码器行类而不是解决错误。最后,我一直在寻找 Java 中的解决方案,而不是 Scala。

【问题讨论】:

    标签: java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders


    【解决方案1】:

    答案是使用RowEncoder 和使用StructType 的数据集架构。

    以下是使用数据集进行平面地图操作的工作示例:

        StructType structType = new StructType();
        structType = structType.add("id1", DataTypes.LongType, false);
        structType = structType.add("id2", DataTypes.LongType, false);
    
        ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
    
        Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Row row) throws Exception {
                // a static map operation to demonstrate
                List<Object> data = new ArrayList<>();
                data.add(1l);
                data.add(2l);
                ArrayList<Row> list = new ArrayList<>();
                list.add(RowFactory.create(data.toArray()));
                return list.iterator();
            }
        }, encoder);
    

    【讨论】:

    • 不应该在集群模式下失败,因为 ArrayList 不可序列化
    【解决方案2】:

    我遇到了同样的问题...Encoders.kryo(Row.class)) 为我工作。

    作为奖励,Apache Spark 调优文档引用了 Kryo,因为它的序列化速度“通常高达 10 倍”:

    https://spark.apache.org/docs/latest/tuning.html

    【讨论】:

    • 如果将数据集序列化为 Parquet 文件,这不会破坏列式存储吗?
    • 使用kryo 编码器和RowEncoder 有什么区别,考虑到这两种方法?
    猜你喜欢
    • 2021-01-19
    • 2018-02-28
    • 1970-01-01
    • 2018-10-25
    • 1970-01-01
    • 2021-12-07
    • 1970-01-01
    • 2017-03-23
    • 2017-02-22
    相关资源
    最近更新 更多