【问题标题】:Kafka MirrorMaker2 - not mirroring consumer group offsetsKafka MirrorMaker2 - 不镜像消费者组偏移量
【发布时间】:2020-05-31 16:04:51
【问题描述】:

我已设置 MirrorMaker2 用于在 2 个 DC 之间复制数据。

我的 mm2.properties,

# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

在 MM2 启动时看到以下内容。

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期复制。源主题在目标集群中作为源创建。但是,消费者组偏移量没有被复制。

在源集群中启动了一个消费者组。

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

消费了几条消息并停止了它。在该主题中发布了新消息,镜像制造商也将数据镜像到目标集群。

我尝试如下消费来自目标集群的消息。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

由于我使用相同的消费者组,我希望我的偏移量也能同步,并且不会使用我在 cluster1 中使用的相同消息。但是,仍然消耗所有消息。我在这里有什么遗漏吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-mirrormaker


    【解决方案1】:

    Kafka 2.7 引入了“自动消费者偏移同步”。 默认情况下,消费者偏移量不会在集群之间同步。 您应该明确启用此功能。

    support automated consumer offset sync across clusters in MM 2.0

    【讨论】:

      【解决方案2】:

      复制偏移量并非微不足道的几个根本原因:

      1. kafka 是一个至少一次的系统(忽略炒作)。这意味着镜像制造商,因为它建立在可以每次超时/断开连接的 kafka 消费者和生产者之上,将导致一定程度的重复记录被传递到目的地。这意味着偏移量不会在源和目标之间映射 1:1。即使您尝试使用“恰好一次”支持(MM2 KIP 明确表示它没有使用),它所做的只是跳过部分交付的批次,但这些批次仍会占用目的地的偏移量李>
      2. 如果您在源主题的记录开始过期很久之后设置镜像,您的目标主题将从偏移量 0 开始,而源主题将具有更高的“最旧”偏移量。已尝试解决此问题(请参阅KIP-391),但从未被接受
      3. 一般来说,不能保证您的镜像拓扑从单个源镜像到单个目标。 the linkedin topology,例如,从多个源集群镜像到“聚合”层集群。映射偏移对于此类拓扑毫无意义

      查看 MM2 KIP 时提到了“偏移同步主题”。 在您的代码中,您可以使用 RemoteClusterUtils 类在集群之间转换检查点:

      Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
         newClusterProperties, oldClusterName, consumerGroupId
      );
      consumer.seek(newOffsets);
      

      这是从以下演示文稿中删除的 - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

      或者,您可以使用 seek by timespamp API 在目标上启动您的消费者组,直到数据传递到目标(或传递到源,如果日志的代理设置在目标上附加时间戳)的粗略时间不要覆盖那些时间)。为了安全起见,您需要倒带一点。

      【讨论】:

        【解决方案3】:

        我的数据正在按预期复制。源主题在目标集群中作为源创建。但是,消费者组偏移量没有被复制。

        默认情况下,MM2 不会从kafka-console-consumer 复制消费者组。在启动时的 MM2 日志中,我们可以看到 groups.blacklist = [console-consumer-.*, connect-.*, __.*]。我相信you can override this 在您的mm2.properties 配置文件中。

        由于我使用相同的消费者组,我希望我的偏移量也能够同步,并且不会使用我在 cluster1 中使用的相同消息。

        一旦消费者组被正确镜像并启用检查点,目标集群中应该会自动创建一个内部主题(类似于dest.checkpoints.internal)。此检查点主题包含每个消费者组中镜像主题分区的源集群和目标集群中最后提交的偏移量。

        然后,您可以使用 Kafka 的 RemoteClusterUtils 实用程序类来转换这些偏移量并获取 source.test-1 的同步偏移量,该偏移量映射到消费者最后提交的 test-1 偏移量。如果您最终使用 Java 创建了一个使用者,您可以将 RemoteClusterUtils 作为依赖项添加到您的项目中:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-mirror-client</artifactId>
            <version>2.4.0</version>
        </dependency>
        

        否则,您可能必须编写一个包装RemoteClusterUtils.java 的工具来获取转换后的偏移量。此功能或something similar 看起来是作为future release for MM2 的一部分计划的。

        【讨论】:

          【解决方案4】:

          我看到你在检查点上的配置是

          emit.checkpoints.enabled = true 
          emit.checkpoints.interval.seconds = 60 
          

          因此,您的检查点主题将仅在 60 秒后反映新的更改。如果立即尝试它不会起作用所以,请在 1 分钟后尝试。

          【讨论】:

            猜你喜欢
            • 2021-05-01
            • 2019-05-01
            • 2020-03-22
            • 1970-01-01
            • 1970-01-01
            • 2018-12-17
            • 1970-01-01
            • 2020-06-11
            • 2018-02-03
            相关资源
            最近更新 更多