【问题标题】:Convert JavaDStream<String> to JavaRDD<String>将 JavaDStream<String> 转换为 JavaRDD<String>
【发布时间】:2014-10-31 04:11:58
【问题描述】:
我有一个从外部源获取数据的 JavaDStream。我正在尝试集成 Spark Streaming 和 SparkSQL。众所周知,JavaDStream 是由 JavaRDD 的 .当我有 JavaRDD 时,我只能应用函数 applySchema()。请帮助我将其转换为 JavaRDD。我知道 scala 中有一些函数,而且它更容易。但是请帮助我使用 Java。
【问题讨论】:
标签:
java
apache-spark
apache-spark-sql
【解决方案2】:
您必须首先使用 forEachRDD 访问 DStream 中的所有 RDD:
javaDStream.foreachRDD( rdd => {
rdd.collect.foreach({
...
})
})
【解决方案3】:
我希望这有助于将 JavaDstream 转换为 JavaRDD!
JavaDStream<String> lines = stream.map(ConsumerRecord::value);
//Create JavaRDD<Row>
lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String msg) {
Row row = RowFactory.create(msg);
return row;
}
});
//Create Schema
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("value", DataTypes.StringType, true)});
//Get Spark 2.0 session
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
msgDataFrame.show();