【问题标题】:Dynamically create and change Kafka topics with Flink使用 Flink 动态创建和更改 Kafka 主题
【发布时间】:2019-01-24 23:09:00
【问题描述】:

我正在使用 Flink 从不同的 Kafka 主题中读取和写入数据。 具体来说,我使用的是 FlinkKafkaConsumer 和 FlinkKafkaProducer。

我想知道是否可以根据我的程序中的逻辑或记录本身的内容将我正在读取和写入的 Kafka 主题更改为“即时”。

例如,如果读取了带有新字段的记录,我想创建一个新主题并开始将带有该字段的记录转移到新主题。

谢谢。

【问题讨论】:

  • 与其做一个全新的话题,为什么不使用原生支持添加字段的Avro格式,即“模式进化”?
  • @cricket_007 我问这个问题是为了帮助设计我们的控制流程——我们想发送关于各种 Kafka 主题的规则以更新程序逻辑,我很好奇 Flink 的 Kafka 连接器的局限性处于当前状态。
  • @GeorgyGobozov 谢谢 - 我希望有更好的消息 :)

标签: apache-kafka apache-flink


【解决方案1】:

如果您的主题遵循通用命名模式,例如“topic-n*”,您的 Flink Kafka 消费者可以自动读取“topic-n1”、“topic-n2”……等等因为它们被添加到 Kafka。

Flink 1.5 (FlinkKafkaConsumer09) 增加了对基于正则表达式的动态分区发现和主题发现的支持。这意味着 Flink-Kafka 消费者可以拿起新的 Kafka 分区,而无需重新启动作业,同时保持完全一次的保证。

接受subscriptionPattern的消费者构造函数:link.

【讨论】:

    【解决方案2】:

    更多地考虑需求,

    第一步是 - 您将从一个主题开始(为简单起见),并在运行时根据提供的数据生成更多主题,并将相应的消息定向到这些主题。这是完全可能的,不会是一个复杂的代码。使用 ZkClient API 检查 topic-name 是否存在,如果不存在,则创建一个具有新名称的模型主题,并通过与该新主题相关联的新生产者开始将消息推送到其中。您无需重新启动作业即可为特定主题生成消息。

    你的初始消费者成为生产者(新主题)+消费者(旧主题)

    第二步是 - 你想为新主题消费消息。一种方法可能是完全产生一份新工作。为此,您可以先创建一个线程池并向它们提供参数。

    再次对此要更加小心,如果出现循环错误,更多的自动化可能会导致集群过载。想想如果输入数据不受控制或只是脏了一段时间后创建的主题过多的可能性。上面提到的 cmets 中可能有更好的架构方法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-06-30
      • 1970-01-01
      • 2021-03-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多