对于那些想要一些 Java 代码而不是 Scala 的人:
JavaPairInputDStream<LongWritable, Text> textFileStream =
jsc.fileStream(
inputPath,
LongWritable.class,
Text.class,
TextInputFormat.class,
FileInputDStream::defaultFilter,
false
);
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
if (depRdd.isEmpty()) {
return null;
}
JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
String filename = depRdd.name();
JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
return newDep.rdd();
}).filter(t -> t != null).collect(Collectors.toList());
Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});