【发布时间】:2018-07-03 19:27:40
【问题描述】:
我正在使用 Flink v1.4.0。我正在使用Kafka FLink Consumer 使用来自Kafka 主题的数据,如下所示:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
有没有办法知道我是否已经消费了整个主题?如何监控偏移量? (这是否足以确认我已使用 Kafka 主题中的所有数据?)
【问题讨论】: