【发布时间】:2019-04-06 00:20:00
【问题描述】:
应用程序从 kafka 主题中读取。 每条消息必须是唯一的(忽略重复) 保存“N”秒的数据 并作为单独的消息写入不同的 kafka 主题
有没有办法将消息保存'N秒'并写入kafka 每条消息必须在收到消息后的“N”秒后写入同一个主题。
目前我将数据保存在内存中的 json 结构中,每次收到消息时,我都会遍历我拥有的所有消息并比较时间。
这当然不是办法
val some_consumer= new FlinkKafkaConsumer09(data_topic
, new JSONKeyValueDeserializationSchema(false), properties)
some_consumer.setStartFromLatest()
val in_stream = env.addSource(some_consumer)
.filter(!_.isNull)
.map(x => processMessage(x))
def process(x: ObjectNode){
// store message in json if not existing
// loop through entire set and compare times
// if after 'N' seconds
// write to kafka
kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))
}
【问题讨论】:
-
即使不推荐,如果在
.send()前加Thread.sleep()会怎样?
标签: scala apache-kafka apache-flink