【问题标题】:How to convert a Spark DataFrame to RDD of POJOs in Java如何在 Java 中将 Spark DataFrame 转换为 POJO 的 RDD
【发布时间】:2025-12-23 22:10:12
【问题描述】:

我对 Spark 很陌生。我想将 DataFrame 转换为 POJO 的 RDD。喜欢:

JavaRDD<POJOClass> data = df.toJavaRDD();

其中 df 是一个 DataFrame。

df.show() 给出:

+---------+---------+---------+---------+                                       
|    A    |    B    |    C    |    D    |
+---------+---------+---------+---------+
|603300042|     1025|        2|127000948|
|603303766|     1112|        2|127000364|
|603302691|     1184|        2|127000853|
|603303766|     1112|        2|127000364|
|603302691|     1184|        2|127000853|
|603303766|     1112|        2|127000364|
|603303787|     1041|        2|137000323|
|603306351|     1041|        2|137000468|
|603304009|     1307|        2|137000788|
|603303830|     1041|        2|137000012|
|603301119|     1002|        2|137000369|
|603301507|     1188|        2|137001568|
|603302168|     1041|        2|137000468|
+---------+---------+---------+---------+

我的POJO类如下:

public static class POJOClass {
        public Long A;
        public Integer B;
        public Integer C;
        public Long D;
}

我知道

JavaRDD<Row> data = df.toJavaRDD();

效果很好。但是有什么解决方法可以解决我想要实现的目标吗?

【问题讨论】:

    标签: java apache-spark dataframe rdd


    【解决方案1】:

    你可以使用数据集

    public static class POJOClass implements serializable{
            public Long A;
            public Integer B;
            public Integer C;
            public Long D;
    }
    
         Dataset<POJOClass> pojos = context.read().json("/data.json").as(Encoders.bean(POJOClass.class)); 
    

    【讨论】:

      【解决方案2】:

      试试这个(未测试):

          JavaPairRDD<Long, POJOClass> jpRDD = jdbcDF.toJavaRDD().mapToPair(new PairFunction<Row, Long, POJOClass>() {
      
              public Tuple2<Long, POJOClass> call(Row row) throws Exception {
                  POJOClass yourPojo = new POJOClass();
                  // Fill your pojo using row.get(index) 
                  return new Tuple2<Long, POJOClass>( anIndex , (POJOClass) yourPojo);
      
              }
      
          }); 
      

      【讨论】:

        【解决方案3】:

        您可以使用如下地图功能。

        import org.apache.spark.api.java.function.Function;
        
        JavaRDD<POJOClass> data = df.toJavaRDD().map(new Function<Row, POJOClass>() {
                @Override
                public POJOClass call(Row row) {
                POJOClass pojo = new POJOClass();
                pojo.setA(row.getLong(0));
                pojo.setB(row.getInt(1));
                pojo.setC(row.getInt(2));
                pojo.setD(row.getLong(3));
                return pojo;
            }
        });
        

        【讨论】: