【发布时间】:2018-04-07 07:29:17
【问题描述】:
我是 spark 流和弹性搜索的新手,我正在尝试使用 spark 从 kafka 主题中读取数据并将数据存储为 rdd。在 rdd 中,我想在新数据到来时附加时间戳,然后推送到 elasticsearch。
lines.foreachRDD(rdd -> {
if(!rdd.isEmpty()){
// rdd.collect().forEach(System.out::println);
String timeStamp = new
SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
List<String> f = rdd.collect();
Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);
JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));
JavaEsSpark.saveToEs(javaRDD, "sample/docs");
}
});
【问题讨论】:
标签: java elasticsearch time apache-kafka spark-streaming