【问题标题】:Dynamic topic in kafka connectorkafka 连接器中的动态主题
【发布时间】:2019-01-07 00:22:27
【问题描述】:

kafka 添加了在连接器中使用正则表达式的新功能,但是似乎连接器启动后来自新添加主题的主题数据在连接器重新启动之前不会被消耗。我们需要动态添加新主题,并让连接器根据连接器属性中定义的正则表达式使用主题。如何实现? 例如:正则表达式:topic-.* 主题:主题1,主题2 如果我引入了新的主题 topic-3,那么如何让连接器在不重启的情况下消费主题数据?

【问题讨论】:

  • 正如Slack 组中所讨论的,Kafka Connect 只会在连接器启动时处理正则表达式。如果在此之后正则表达式的结果发生变化,则连接器不会选择新的结果。您希望多久添加一次新主题?
  • 目前可以每两周或一个月添加一次主题,但我们确信频率会增加。我正在寻找的是是否可以为上述用例采用任何替代方案,以及每次重新启动连接器时重新启动连接器的含义。我了解 kafka 连接器具有偏移管理并保证消息的传递,但是,如果您能提供一些关于生产环境下连接器重启的影响的资源,将会有所帮助。
  • 为什么不做一个新的连接器呢?与仅尝试从不断增长的主题中消耗最多 N 个任务相比,这将具有更好的扩展性和更高的容错性

标签: apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

按照其他人已经在 cmets 中给出的想法,基本上你需要做的就是建立一个机制来识别已经引入了一个新主题并且需要干净地重新启动连接器。

我会做这样的事情,

1> 在已连接的主题(例如 topic-1)中发送特定类型的消息,如果收到这样的消息,代码应保持所有新的 msg 轮询并等待所有偏移提交完成。

2> 然后从轮询循环中中断并从您的消费者(consumer.unsubscribe())中删除订阅。

3> 在订阅正则表达式主题的常规流程之后,需要遵循一开始就完成的流程,因为新主题现在将成为正则表达式的一部分。

记住提交很重要,如果你匆忙重启连接器,你可能会得到重复。也很明显不要更改 group.id 并将 auto.offset.reset 保持为“最新”。

【讨论】:

    【解决方案2】:

    Kafka 消费者有一个选项metadata.max.age.ms - 消费者刷新主题元数据的时间间隔。如果您不需要实时,它可能会有所帮助。另见:kafka consumer to dynamically detect topics added

    /etc/kafka-connect/kafka-connect.properties 中,您应该指定consumer.metadata.max.age.ms=1000 1 秒。

    【讨论】:

      猜你喜欢
      • 2017-02-02
      • 2019-02-20
      • 2021-03-16
      • 1970-01-01
      • 2018-08-03
      • 2015-09-03
      • 1970-01-01
      • 1970-01-01
      • 2020-04-29
      相关资源
      最近更新 更多