【问题标题】:How copy some message from one kafka topic to another from bash?如何从 bash 将一些消息从一个 kafka 主题复制到另一个主题?
【发布时间】:2020-04-02 13:01:19
【问题描述】:

请帮忙

我们有 2 个 kafka 主题。我想从 topic1 到 topic2 复制 10 条消息。

我正在尝试使用 kafka-console-consumerkafka-console-producer

首先我将 10 条消息从 topic1 保存到某个目录:

for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;

然后我尝试使用 kafka-console-producer 将其发送到 topic2:

for (( i=1; i<=10; i++ )); do  bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;

我得到了错误 - 我的服务无法反序列化数据。 我的问题是:

  1. 我的解决方案行得通吗?
  2. 为什么我会收到此错误?
  3. 将消息从一个主题复制到另一个主题的最佳方法是什么?

更新: 我如何解决这个问题(感谢:Robin Moffatt): 我使用 kafka-mirror 和这个 jar : https://github.com/opencore/mirrormaker_topic_rename 有了这个,我可以将消息从一个主题 kafka 复制到一个集群上的另一个主题

【问题讨论】:

    标签: apache-kafka kafkacat


    【解决方案1】:

    你可以用kafkacat做到这一点:

    kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
    kafkacat -b localhost:9092 -P -t target-topic -K: 
    
    • | 将第一个 kafkacat(-C 消费者)的输出重定向到第二个 kafkacat(-P 生产者)的输入中
    • -c10 表示只消费 10 条消息
    • -o beginning 表示从主题的开头开始。

    请注意,如果您有二进制数据(例如 Avro),这将不起作用。要正确执行此操作,请使用 Replicator 或 MirrorMaker2 和 ByteArrayConverter 之类的东西。

    参考:https://rmoff.net/2019/09/29/copying-data-between-kafka-clusters-with-kafkacat/

    【讨论】:

    • 感谢您的回答。据我了解,镜像制造商只能将一个主题镜像到另一个集群上的同一主题(具有相同的名称等),但我需要在一个集群上将消息从 topic1 复制到 topic2。我们使用 avro :(
    • 我没用过,但是我不明白为什么你不能在MirrorMaker2中给你的源和目标代理一样的。因为它是基于 Kafka Connect 构建的,所以您应该能够使用 RegExRouter 单消息转换 (SMT) 将主题从 topic1 路由到 topic2
    • 这很有用,但分隔符有问题。我试图使用不支持的字符作为分隔符。这没有记录,但今天的 kafkacat 只接受单字节的分隔符(即包含在前 127 个 ascii 字符中)。该修复已在此处开放 3 年:github.com/edenhill/kafkacat/pull/150
    猜你喜欢
    • 1970-01-01
    • 2020-05-09
    • 1970-01-01
    • 2017-12-13
    • 1970-01-01
    • 2019-10-09
    • 2021-03-18
    • 2019-06-23
    • 1970-01-01
    相关资源
    最近更新 更多