【问题标题】:Consuming from the beginning of a kafka topic with Flink使用 Flink 从一个 kafka 主题开始消费
【发布时间】:2016-09-04 09:20:55
【问题描述】:

如何确保我始终使用 Flink 从 Kafka 主题的开头开始消费?

随着 Kafka 0.9.x consumer 是 Fl​​ink 1.0.2 的一部分,似乎不再是 Kafka 而是 Flink 来控制偏移量:

Flink 在内部对偏移量进行快照,作为其一部分 分布式检查点。提交给 Kafka / ZooKeeper 的偏移量 只是为了让外界对进度的看法与 Flink 的同步 进度的视图。这样,监控和其他工作可以获得 查看 Flink Kafka 消费者对主题的消费程度。

这就是我的进展,但我的 Flink 程序总是从它停止的地方开始,并且不会按照配置的指示返回到开头:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")

val incomingData = env.addSource(
  new FlinkKafkaConsumer09[IncomingDataRecord](
    "my.topic.name",
    new IncomingDataSchema,
    props
  )
)

【问题讨论】:

    标签: scala apache-kafka apache-zookeeper apache-flink flink-streaming


    【解决方案1】:

    用途:

    consumer.setStartFromEarliest();
    

    【讨论】:

      【解决方案2】:

      我认为你可以通过指定一个随机的group.id 来解决这个问题:

      val props = new Properties()
      props.setProperty("bootstrap.servers", "localhost:9092");
      props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
      props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"
      

      auto.offset.reset 仅在 ZooKeeper 中没有可用的初始偏移量时有效。

      【讨论】:

      • 这就是我一直在做的 :-) 但这并不实用,因为我失去了为我的 Kafka 消费者指定组的能力。根据我在我的问题中引用的文档,Flink 1.0.2 显然不再依赖提交给 zookeeper 的偏移量并跟踪偏移量本身。
      • 当 Kafka 消费者启动且组没有可用的偏移量时,它将根据偏移量重置策略启动。 Flink 所做的所有偏移跟踪仅适用于作业的生命周期(保存点除外)。因此,一旦作业停止,我们将依赖 ZK 或自动偏移策略。您正在寻找的是完全禁用偏移提交(到 ZK 或代理)的能力,以便 Flink 永远无法获取偏移量。这是目前 Flink 的 Kafka 连接器中没有的功能
      • 你确定最小的是正确的吗?我得到了这个例外,但它最早可以正常工作。 org.apache.kafka.common.config.ConfigException:配置 auto.offset.reset 的最小值无效:字符串必须是以下之一:最新、最早、无
      猜你喜欢
      • 2020-07-18
      • 2018-12-31
      • 2021-07-06
      • 1970-01-01
      • 2016-08-27
      • 2021-12-12
      • 2017-02-15
      • 2020-03-19
      • 1970-01-01
      相关资源
      最近更新 更多