【问题标题】:Kafka MirrorMaker 2.0 duplicate each messagesKafka MirrorMaker 2.0 复制每条消息
【发布时间】:2020-05-17 05:11:04
【问题描述】:

我正在尝试使用 MirrorMaker 2.0 复制 Kafka 集群。我正在使用以下 mm2.properties:

name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2

# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2

site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093

site1->site2.enabled = true
site1->site2.topics = topic1


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

所以问题来了,mm2 似乎总是复制 x3 消息:

# Manual message production: 

 kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"


# Result in the source topic (site1 cluster): 

% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407

 kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"

# Result in the target topic (site2 cluster): 

% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3

我尝试使用来自 confluent 包的 Kafka 和直接来自 Apache 的 kafka_2.13-2.4.0,两者都使用 Debian 10.1。

我首先在 confluent 5.4 中鼓励这种行为,认为这可能是他们的包中的一个错误,因为他们有复制器并且不应该真正关心 mm2,但我直接从 kafka_2.13-2.4.0 复制了完全相同的问题Apache没有任何改变。

我知道 mm2 还不是幂等的,不能保证一次交付。在我的测试中(我尝试了很多东西,包括生产者调整或更大批量的数千条消息)。在所有这些测试中,mm2 总是重复 X3 的所有消息。

我错过了什么,有人鼓励同样的事情吗?作为具有相同软件包的旧版 mm1 的站点注释,我没有这个问题。

感谢任何帮助...谢谢!


即使更改日​​志没有让我对改进非常有信心,但这次我再次尝试从 kafka 2.4.1 运行 mm2。 => 这些奇怪的重复总是没有变化。

我在新服务器上安装了这个版本,以确保我遇到的奇怪行为与服务器无关。

当我使用 ACL 时,我需要特殊权限吗?我把“全部”认为它不能更宽容......即使 mm2 不是幂等的,我也会尝试与此相关的权利。

更让我吃惊的是,我找不到任何报告这样的问题,我肯定做错了什么,但那是什么问题......

【问题讨论】:

    标签: apache-kafka apache-kafka-connect apache-kafka-mirrormaker


    【解决方案1】:

    您需要从配置中删除 connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector,因为这告诉 Mirror Maker 将此类用于它生成的心跳和检查点连接器以及复制数据的源连接器,并且此类使它们的行为与源连接器,这就是为什么每次复制 3 条消息的原因,实际上您生成了 3 个源连接器。

    【讨论】:

    • 这并没有提供问题的答案。您可以search for similar questions,或参考页面右侧的相关和链接问题找到答案。如果您有一个相关但不同的问题,ask a new question,并包含指向此问题的链接以帮助提供上下文。见:Ask questions, get answers, no distractions
    • 我知道,我会发表评论而不是回答,但我无法发表评论,因为我还没有足够的代表,如果我创建一个新问题,它可能会被标记为重复,我认为最好让这个现有问题更具可见性。我已经尝试找到类似的问题但失败了。另外,我阅读了给出良好答案的指南,即使您无法回答问题,其中也有一些关于添加信息的内容,我认为我所说的提供了一些新信息,所以我认为这是我的唯一方法与 OP 分享。
    • 你是救生员!至少可以说,关于如何在三种不同的运行模式中配置 MirrorMaker 2.0 的 KIP-545 文档令人困惑。单个配置条目如何导致这种奇怪的行为?这至少应该被配置验证逻辑拒绝。无论如何,非常感谢!
    • 我可以确认删除该行是解决方案。
    【解决方案2】:

    对客户端配置启用幂等性将解决此问题。默认情况下,它将设置为 false。将以下内容添加到 mm2.properties 文件中

    source.cluster.producer.enable.idempotence = true
    target.cluster.producer.enable.idempotence = true
    

    【讨论】:

      猜你喜欢
      • 2020-05-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-12
      • 1970-01-01
      • 2016-01-07
      • 2020-12-22
      相关资源
      最近更新 更多