【问题标题】:How can I know that I have consumed all of a Kafka Topic?我怎么知道我已经消费了所有的 Kafka 主题?
【发布时间】:2018-07-03 19:27:40
【问题描述】:

我正在使用 Flink v1.4.0。我正在使用Kafka FLink Consumer 使用来自Kafka 主题的数据,如下所示:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...

有没有办法知道我是否已经消费了整个主题?如何监控偏移量? (这是否足以确认我已使用 Kafka 主题中的所有数据?)

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    由于 Kafka 通常用于连续的数据流,因此使用“全部”主题可能是一个有意义的概念,也可能不是一个有意义的概念。我建议你看看documentation on how Flink exposes Kafka's metrics,里面有这样的解释:

    The difference between the committed offset and the most recent offset in 
    each partition is called the consumer lag. If the Flink topology is consuming 
    the data slower from the topic than new data is added, the lag will increase 
    and the consumer will fall behind. For large production deployments we 
    recommend monitoring that metric to avoid increasing latency.
    

    因此,如果消费者滞后为零,那么您就赶上了。也就是说,您可能希望能够自己比较偏移量,但我不知道有什么简单的方法可以做到这一点。

    【讨论】:

      【解决方案2】:

      Kafka 用作流媒体源,流没有终点。

      如果我没记错的话,Flink 的 Kafka 连接器每 X 毫秒从一个 Topic 中拉取数据,因为所有 kafka 消费者都是 Active 消费者,如果一个主题内有新数据,Kafka 不会通知你

      因此,在您的情况下,只需设置一个超时时间,如果您在那段时间内没有读取数据,那么您已经读取了主题内的所有数据。

      无论如何,如果你需要读取一批有限的数据,你可以使用 Flink 的一些 Windows 或者在你的 Kafka 主题中引入某种标记,来分隔批的开始和开始。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-12-10
        • 1970-01-01
        • 2011-10-07
        • 1970-01-01
        • 2016-09-28
        • 2017-04-05
        相关资源
        最近更新 更多