【问题标题】:Convert a JavaRDD String to JavaRDD Vector将 JavaRDD 字符串转换为 JavaRDD 向量
【发布时间】:2017-03-03 04:19:08
【问题描述】:

我正在尝试将 csv 文件加载为 JavaRDD 字符串,然后想在 JavaRDD 向量中获取数据

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;
import org.apache.spark.mllib.stat.Statistics;

import breeze.collection.mutable.SparseArray;
import scala.collection.immutable.Seq;




public class Trial {
    public void start() throws InstantiationException, IllegalAccessException,
    ClassNotFoundException {

        run();
    }


    private void run(){
SparkConf conf = new SparkConf().setAppName("csvparser");
JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> data = jsc.textFile("C:/Users/kalraa2/Documents/trial.csv");
JavaRDD<Vector> datamain = data.flatMap(null);
MultivariateStatisticalSummary mat = Statistics.colStats(datamain.rdd());

        System.out.println(mat.mean());


    }

    private List<Vector> Seq(Vector dv) {
        // TODO Auto-generated method stub
        return null;
    }


    public static void main(String[] args) throws Exception {

        Trial trial = new Trial();
        trial.start();
    }
}

该程序运行时没有任何错误,但尝试在 spark-machine 上运行它时我什么也得不到。谁能告诉我字符串RDD到Vector RDD的转换是否正确。

我的 csv 文件只包含一列是浮点数

【问题讨论】:

    标签: java apache-spark apache-spark-mllib


    【解决方案1】:

    flatMap 调用中的 null 可能有问题:

    JavaRDD&lt;Vector&gt; datamain = data.flatMap(null);

    【讨论】:

    • 解决办法是什么?
    • 很难说,因为我不知道你想用它做什么。我对MultivariateStatisticalSummary 一无所知。尝试在单元测试中运行它,这很容易,您会收到错误消息。当然,您需要为 flatMap 提供一些功能,但我不知道是哪种。
    【解决方案2】:

    我通过将代码更改为此解决了我的答案

    JavaRDD<Vector> datamain = data.map(new Function<String,Vector>(){
                public Vector call(String s){
                    String[] sarray = s.trim().split("\\r?\\n");
                    double[] values = new double[sarray.length];
                    for (int i = 0; i < sarray.length; i++) {
                      values[i] = Double.parseDouble(sarray[i]);
                      System.out.println(values[i]);
                    }
                    return Vectors.dense(values);  
                    }
                }
            );
    

    【讨论】:

      【解决方案3】:

      假设您的 trial.csv 文件如下所示

      1.0
      2.0
      3.0
      

      从您的问题中获取原始代码,Java 8 需要进行一行更改

      SparkConf conf = new SparkConf().setAppName("csvparser").setMaster("local");
      JavaSparkContext jsc = new JavaSparkContext(conf);
      JavaRDD<String> data = jsc.textFile("C:/Users/kalraa2/Documents/trial.csv");
      JavaRDD<Vector> datamain = data.map(s -> Vectors.dense(Double.parseDouble(s)));
      MultivariateStatisticalSummary mat = Statistics.colStats(datamain.rdd());
      
      System.out.println(mat.mean());
      

      打印2.0

      【讨论】:

        猜你喜欢
        • 2018-11-14
        • 2016-07-24
        • 2020-06-01
        • 2017-02-25
        • 2019-12-23
        • 2016-01-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多