【问题标题】:From the following code how to convert a JavaRDD<Integer> to DataFrame or DataSet从以下代码如何将 JavaRDD<Integer> 转换为 DataFrame 或 DataSet
【发布时间】:2020-10-02 13:30:50
【问题描述】:
public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

从上面的代码中,我无法将 JavaRdd (mappartRdd) 转换为 Java Spark 中的 DataFrame。 我正在使用以下方法将 JavaRdd 转换为 DataFrame/DataSet。

sessn.createDataFrame(mappartRdd, beanClass);

我为 createDataFrame 尝试了多个选项和不同的重载函数。我面临将其转换为 DF 的问题。我需要提供什么 beanclass 才能使代码正常工作?

与 scala 不同,Java 中没有像 toDF() 这样的函数来将 RDD 转换为 DataFrame。有人可以根据我的要求协助转换它。

注意:我可以通过修改上面的代码直接创建一个数据集,如下所示。

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

但我想知道如果我使用 createDataFrame,为什么我的 JavaRdd 没有转换为 DF/DS。任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark apache-spark-sql rdd sparkcore


    【解决方案1】:

    这似乎是this SO Question的后续行动

    我想,你正处于学习火花的阶段。我建议了解提供的 java api - https://spark.apache.org/docs/latest/api/java/index.html

    关于你的问题,如果你检查createDataFrame api,它如下-

     def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
    ...
    }
    

    如您所见,它将JavaRDD[Row] 和相关的StructType 架构作为参数。因此创建DataFrame 等于Dataset&lt;Row&gt; 使用下面的sn-p-

    JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
    
      StructType schema = new StructType()
                    .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
            Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
            df.show(false);
            df.printSchema();
    
            /**
             * +-----+
             * |value|
             * +-----+
             * |6    |
             * |8    |
             * |6    |
             * +-----+
             *
             * root
             *  |-- value: integer (nullable = true)
             */
    

    【讨论】:

    • :解决方案确实有帮助。我知道你来自哪里。但这是期望 JavaRDD 的重载函数之一。 createDataFrame(JavaRDD rowRDD, StructType 模式)。但是还有另外一个函数,它以 Generic RDD 作为参数,如下所示。 createDataFrame(JavaRDD> rdd, Class> beanClass) 。这是我被困在什么需要作为 bean 类传递的地方。如果你知道应该通过什么。请帮忙。但除此之外,我对提供的解决方案感到满意。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-09
    • 2017-04-03
    • 2017-04-17
    • 2022-07-28
    • 2021-04-12
    相关资源
    最近更新 更多