【问题标题】:Convert JavaDStream<String> to JavaRDD<String>将 JavaDStream<String> 转换为 JavaRDD<String>
【发布时间】:2014-10-31 04:11:58
【问题描述】:

我有一个从外部源获取数据的 JavaDStream。我正在尝试集成 Spark Streaming 和 SparkSQL。众所周知,JavaDStream 是由 JavaRDD 的 .当我有 JavaRDD 时,我只能应用函数 applySchema()。请帮助我将其转换为 JavaRDD。我知道 scala 中有一些函数,而且它更容易。但是请帮助我使用 Java。

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    您不能将 DStream 转换为 RDD。正如您所提到的, DStream 包含 RDD。访问 RDD 的方法是使用 foreachRDD 对 DStream 的每个 RDD 应用一个函数。查看文档:https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function2)

    【讨论】:

    • 谢谢。帮了我很多!
    【解决方案2】:

    您必须首先使用 forEachRDD 访问 DStream 中的所有 RDD:

    javaDStream.foreachRDD( rdd => {
        rdd.collect.foreach({
            ...
        })
    })
    

    【讨论】:

      【解决方案3】:

      我希望这有助于将 JavaDstream 转换为 JavaRDD!

          JavaDStream<String> lines = stream.map(ConsumerRecord::value);
      
          //Create JavaRDD<Row>
          lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
              @Override
              public void call(JavaRDD<String> rdd) {
                  JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                      @Override
                      public Row call(String msg) {
                          Row row = RowFactory.create(msg);
                          return row;
                      }
                  });
                  //Create Schema
                  StructType schema = DataTypes.createStructType(new StructField[] {
                          DataTypes.createStructField("value", DataTypes.StringType, true)});
                  //Get Spark 2.0 session
                  SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
                  Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
                  msgDataFrame.show();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-06-30
        • 2019-12-23
        • 2018-11-14
        • 1970-01-01
        • 1970-01-01
        • 2021-07-04
        • 2021-12-21
        • 1970-01-01
        相关资源
        最近更新 更多