【问题标题】:Convert JavaRDD<Row> to JavaRDD<Vector>将 JavaRDD<Row> 转换为 JavaRDD<Vector>
【发布时间】:2016-07-24 11:25:08
【问题描述】:

我正在尝试在 Wikipedia XML 转储上执行 LDA。在获得原始文本的 RDD 后,我正在创建一个数据框并通过 Tokenizer、StopWords 和 CountVectorizer 管道对其进行转换。我打算将向量输出的 RDD 从 CountVectorizer 传递到 MLLib 中的 OnlineLDA。 这是我的代码:

 // Configure an ML pipeline
 RegexTokenizer tokenizer = new RegexTokenizer()
   .setInputCol("text")
   .setOutputCol("words");

 StopWordsRemover remover = new StopWordsRemover()
          .setInputCol("words")
          .setOutputCol("filtered");

 CountVectorizer cv = new CountVectorizer()
          .setVocabSize(vocabSize)
          .setInputCol("filtered")
          .setOutputCol("features");

 Pipeline pipeline = new Pipeline()
          .setStages(new PipelineStage[] {tokenizer, remover, cv});

// Fit the pipeline to train documents.
 PipelineModel model = pipeline.fit(fileDF);

 JavaRDD<Vector> countVectors = model.transform(fileDF)
          .select("features").toJavaRDD()
          .map(new Function<Row, Vector>() {
            public Vector call(Row row) throws Exception {
                Object[] arr = row.getList(0).toArray();

                double[] features = new double[arr.length];
                int i = 0;
                for(Object obj : arr){
                    features[i++] = (double)obj;
                }
                return Vectors.dense(features);
            }
          });

因为这条线,我得到了类转换异常

Object[] arr = row.getList(0).toArray();


Caused by: java.lang.ClassCastException: org.apache.spark.mllib.linalg.SparseVector cannot be cast to scala.collection.Seq
at org.apache.spark.sql.Row$class.getSeq(Row.scala:278)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:192)
at org.apache.spark.sql.Row$class.getList(Row.scala:286)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getList(rows.scala:192)
at xmlProcess.ParseXML$2.call(ParseXML.java:142)
at xmlProcess.ParseXML$2.call(ParseXML.java:1)

我找到了执行此操作的 Scala 语法here,但找不到任何在 Java 中执行此操作的示例。我试过 row.getAs[Vector](0) 但这只是 Scala 语法。有什么方法可以在 Java 中做到这一点?

【问题讨论】:

    标签: java apache-spark spark-dataframe apache-spark-mllib


    【解决方案1】:

    所以我可以通过对 Vector 的简单转换来做到这一点。我不知道为什么我没有先尝试简单的事情!

             JavaRDD<Vector> countVectors = model.transform(fileDF)
                  .select("features").toJavaRDD()
                  .map(new Function<Row, Vector>() {
                    public Vector call(Row row) throws Exception {
                        return (Vector)row.get(0);
                    }
                  });
    

    【讨论】:

      【解决方案2】:

      您无需将DataFrame/DataSet 转换为JavaRDD 即可与LDA 一起使用。经过几个小时的摆弄,我终于让 Scala 中的原生 rdd 开始工作了。

      相关进口:

      import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
      import org.apache.spark.ml.linalg.{Vector => MLVector}
      import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.sql.{Row, SparkSession}
      

      代码的sn-p跟在后面跟this example一样:

      val cvModel = new CountVectorizer()
              .setInputCol("filtered")
              .setOutputCol("features")
              .setVocabSize(vocabSize)
              .fit(filteredTokens)
      
      
      val countVectors = cvModel
              .transform(filteredTokens)
              .select("docId","features")
              .rdd.map { case Row(docId: String, features: MLVector) => 
                         (docId.toLong, Vectors.fromML(features)) 
                       }
      val mbf = {
          // add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
          val corpusSize = countVectors.count()
          2.0 / maxIterations + 1.0 / corpusSize
        }
        val lda = new LDA()
          .setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf)))
          .setK(numTopics)
          .setMaxIterations(2)
          .setDocConcentration(-1) // use default symmetric document-topic prior
          .setTopicConcentration(-1) // use default symmetric topic-word prior
      
        val startTime = System.nanoTime()
        val ldaModel = lda.run(countVectors)
        val elapsed = (System.nanoTime() - startTime) / 1e9
      
        /**
          * Print results.
          */
        // Print training time
        println(s"Finished training LDA model.  Summary:")
        println(s"Training time (sec)\t$elapsed")
        println(s"==========")
      

      感谢代码作者here

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-06-01
        • 2017-02-25
        • 2019-12-23
        • 1970-01-01
        • 1970-01-01
        • 2016-01-05
        • 2021-04-12
        • 1970-01-01
        相关资源
        最近更新 更多