【问题标题】:How to distribute messages between Kafka topics with different configuration?如何在不同配置的 Kafka 主题之间分发消息?
【发布时间】:2020-08-19 09:17:31
【问题描述】:

我正在寻找一种如何在两个 Kafka 主题之间分发消息的方法。在原始主题中,我有 20 个分区,每个分区有 1000000 条消息。我希望有一个具有 1000 个分区的新主题,并在新的更广泛的分区范围内传播消息。

1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)

是否可以在 Kafka 中做到这一点(通过主题镜像或其他技术)?

【问题讨论】:

    标签: apache-kafka apache-kafka-mirrormaker kafka-partition


    【解决方案1】:

    您可以使用 Kafka 附带的 MirrorMaker(版本 1)。该工具主要用于将数据从一个数据中心复制到另一个数据中心。它是建立在主题名称在两个集群中保持相同的假设之上的。

    但是,您可以提供重命名主题的自定义 MessageHandler

    package org.xxx.java;
    
    import java.util.Collections;
    import java.util.List;
    import kafka.consumer.BaseConsumerRecord;
    import kafka.tools.MirrorMaker;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    
    /**
     * An example implementation of MirrorMakerMessageHandler that allows to rename topic.
     */
    public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
      private final String newName;
    
      public TopicRenameHandler(String newName) {
        this.newName = newName;
      }
    
      public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
        return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
      }
    }
    

    我在pom.xml 文件中使用了以下依赖项

        <properties>
            <kafka.version>2.5.0</kafka.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.13</artifactId>
                <version>${kafka.version}</version>
            </dependency>
        </dependencies>
    

    编译上面的代码并确保将你的类添加到CLASSPATH

    export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
    

    现在,连同一些基本的consumer.properties

    bootstrap.servers=localhost:9092
    client.id=mirror-maker-consumer
    group.id=mirror-maker-rename-topic
    auto.offset.reset=earliest
    

    producer.properties

    bootstrap.servers=localhost:9092
    client.id=mirror-maker-producer
    

    您可以拨打以下电话kafka-mirror-maker

    kafka-mirror-maker --consumer.config /path/to/consumer.properties \
     --producer.config /path/to/producer.properties \
     --num.streams 1 \
     --whitelist="topicToBeRenamed" \
     --message.handler org.xxx.java.TopicRenameHandler \
     --message.handler.args "newTopicName"
    

    请注意此方法的以下两个注意事项:

    • 当您计划更改分区数时,新主题中的消息顺序可能与旧主题不同。默认情况下,消息按 Kafka 中的键进行分区。
    • 使用 MirrorMaker 不会复制旧主题中的现有偏移量,而是开始写入新的偏移量。因此,旧主题的偏移量与新主题的偏移量之间(几乎)没有关系。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-18
      • 1970-01-01
      • 2015-11-30
      • 1970-01-01
      • 2021-05-18
      • 2016-01-21
      • 2019-02-25
      • 1970-01-01
      相关资源
      最近更新 更多