【发布时间】:2016-06-27 17:14:03
【问题描述】:
我的 spark-streaming 应用程序在没有 ZooKeeper 帮助的情况下使用直接流方法从 Kafka 读取数据。我想处理失败,例如在我的应用程序中遵循 Exactly-once Semantics。我关注this 以供参考。一切看起来都很完美,除了:
val stream: InputDStream[(String,Long)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Long)](
ssc, kafkaParams, fromOffsets,
// we're just going to count messages per topic, don't care about the contents, so convert each message to (topic, 1)
(mmd: MessageAndMetadata[String, String]) => (mmd.topic, 1L))
在应用程序的第一次运行中,由于不会读取偏移量,所以要为fromOffsets Map 参数传递什么值?我当然错过了一些东西。
感谢并感谢任何帮助!
【问题讨论】:
-
第一个偏移量是
0L——你想用它作为每个TopicAndPartition的fromOffset
标签: scala apache-spark apache-kafka spark-streaming kafka-consumer-api