【发布时间】:2018-02-08 13:37:11
【问题描述】:
我正在使用 Scala 中的 Spark 在 Kafka 消费者应用程序中消费和处理消息。有时,处理来自 Kafka 消息队列的消息所需的时间比平时多一点。这时我需要消费最新的消息,忽略生产者已经发布但尚未消费的较早消息。
这是我的消费者代码:
object KafkaSparkConsumer extends MessageProcessor {
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(1))
val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"auto.offset.reset" -> properties.getProperty("offset_reset"),
"zookeeper.session.timeout" -> properties.getProperty("zookeeper_timeout"))
val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaParams,
Map("moved_object" -> 1),
StorageLevel.MEMORY_ONLY_SER
).map(_._2)
msgStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
}
有什么方法可以确保消费者始终获得消费者应用程序中的最新消息?或者我是否需要在 Kafka 配置中设置任何属性来实现相同的效果?
对此的任何帮助将不胜感激。谢谢
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming kafka-consumer-api