【问题标题】:How to send Kafka message from one topic to another topic?如何将 Kafka 消息从一个主题发送到另一个主题?
【发布时间】:2020-05-09 23:27:41
【问题描述】:

假设我的生产者正在将消息写入主题 A...一旦消息在主题 A 中,我想将相同的消息复制到主题 B。这在 kafka 中是否可行?

【问题讨论】:

  • 欢迎来到 StackOverflow!为什么要在主题之间复制数据?为什么不只是从主题“B”中消费的东西只是从主题“A”中消费呢?还是要过滤数据?还是将它放在单独的集群上?如果是这样,请编辑您的问题以澄清:)

标签: apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

如果我理解正确,您只需要stream.to("topic-b"),不过,如果不对数据进行某些操作,这似乎很奇怪。

注意:

指定主题在使用前需手动创建

【讨论】:

    【解决方案2】:

    我不清楚您通过简单地将数据从一个主题复制到另一个主题来实现什么用例。如果两个主题都在同一个 Kafka 集群中,那么拥有两个具有相同消息/内容的主题绝不是一个好主意

    我认为这里的差距可能是您对 Kafka 中的消费者组 的概念并不清楚。通过使用来自 Kafka 主题的消息,您可能需要执行两个操作项。而且您相信如果第一个应用程序使用来自 Kafka 主题的消息,那么第二个应用程序是否可以使用相同的消息。 Kafka 可以让你在消费者组的帮助下解决这种常见的用例。

    让我们尝试区分其他消息队列和 Kafka,您会明白您不需要在两个主题之间复制相同的数据/消息。

    在其他消息队列中,例如 SQS(Simple Queue Service),如果消息被消费者消费,则相同的消息不能被其他消费者消费。消费者有责任在处理完消息后安全地删除消息。通过这样做,我们可以保证同一消息不会被两个消费者处理而导致不一致。

    但是,在 Kafka 中,让多组消费者从同一个主题消费是完全可以的。这组消费者形成一个通常称为消费者组的组。在这里,来自消费者组的消费者之一可以根据正在消费消息的 Kafka 主题的分区来处理消息。

    现在要注意的是,我们可以让多个消费者群体从同一个 Kafka 主题中消费。每个消费者组都会按照自己想要的方式处理消息。 两个不同消费群体的消费者之间没有干扰

    为了满足您的用例,我相信您可能需要两个能够以他们想要的方式简单地处理消息的消费者组。您基本上不必在两个主题之间复制数据。

    希望这会有所帮助。

    【讨论】:

      【解决方案3】:

      有两个直接选项可以将一个主题的内容转发到另一个主题:

      1. 利用Kafka的流特性创建转发链接 两个主题之间。
      2. 通过创建消费者/生产者对 并使用它们来接收然后转发消息

      我有一小段代码显示两者(在 Scala 中):

        def topologyPlan(): StreamsBuilder = {
          val builder = new StreamsBuilder
          val inputTopic: KStream[String, String] = builder.stream[String, String]("topic2")
          inputTopic.to("topic3")
          builder
        }
      
        def run() = {
          val kafkaStreams = createStreams(topologyPlan())
          kafkaStreams.start()
      
          val kafkaConsumer = createConsumer()
          val kafkaProducer = createProducer()
          kafkaConsumer.subscribe(List("topic1").asJava)
          while (true) {
            val record = kafkaConsumer.poll(Duration.ofSeconds(5)).asScala
            for (data <- record.iterator) {
              kafkaProducer.send(new ProducerRecord[String, String]("topic2", data.value()))
            }
          }
        }
      

      查看 run 方法,前两行设置了一个流对象,使用 topologyPlan() 侦听“topic2”中的消息,然后转发到“topic3”。

      剩下的几行展示了消费者如何收听“topic1”并使用生产者将它们发送到“topic2”。

      这里示例的最后一点是 Kafka 足够灵活,可以让您根据需要混合选项,因此上面的代码将在“topic1”中获取消息,并通过“topic2”将它们发送到“topic3”。

      如果您想查看设置消费者、生产者和流的代码,请查看完整类here

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-12-13
        • 2021-03-18
        • 2021-06-17
        • 2019-10-09
        • 2019-07-25
        • 1970-01-01
        相关资源
        最近更新 更多