【发布时间】:2018-02-04 05:55:20
【问题描述】:
我有如下代码可以从 Kafka 读取并保存到 Elasticsearch,我正在使用 spark 流:
JavaDStream<String> liness = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
System.out.println(kafkaRecord.topic() + kafkaRecord.value());
return kafkaRecord.value();
}
});
JavaEsSparkStreaming.saveJsonToEs(lines, "events/redict");
jssc.start();
我的问题是我需要根据主题名称使 "events/redict" 动态化,我也可以在 messages.map 调用中获取主题名称但是如何在此处添加动态?
【问题讨论】:
-
如何从消费者记录映射到一个由两个字段组成的新对象,值的字符串和取决于主题的值?
-
如果您只想将 Kakfa 记录保存到 Elastic,为什么不使用 Logstash 或 Kafka Connect?
-
谢谢,板球我更喜欢 Spark。
-
是的,Dominik,这是一种方法,但我正在寻找一种我必须更改消息的方法。
-
所以您宁愿编程并将其部署到某种类型的 Spark 集群,然后使用内置于 ELK 堆栈或 Kafka 中的工具?
标签: java apache-spark apache-kafka spark-streaming