【发布时间】:2018-03-08 12:40:50
【问题描述】:
我在火花流中从 Kafka 流中获得数百万条消息。有 15 种不同类型的消息。消息来自一个主题。我只能通过内容来区分消息。所以我使用 rdd.contains 方法来获取不同类型的 rdd。
示例消息
{"a":"foo", "b":"bar","type":"first" .......}
{"a":"foo1", "b":"bar1","type":"second" .......}
{"a":"foo2", "b":"bar2","type":"第三个".......}
{"a":"foo", "b":"bar","type":"first" .......}
.............
.............
.........
等等
代码
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
有没有办法从 kafka 主题消息中获取不同的 rdd?
【问题讨论】:
标签: scala apache-kafka spark-streaming contains