【问题标题】:Kafka connect (standalone) writing data to multiple partitionsKafka connect(独立)将数据写入多个分区
【发布时间】:2017-11-28 20:28:18
【问题描述】:

我正在尝试使用 Kafka 连接以使用独立模式写入数据。我正在向其中写入数据的主题是有多个分区。但是,数据仅写入其中一个分区。当我启动多个消费者控制台时,数据仅打印到其中一个。另一个消费者控制台只有在第一个控制台关闭后才能获得任何数据。我无法弄清楚我需要在配置文件中进行哪些更改才能使其写入多个分区。

这里是standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084

连接文件源.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group

现在我使用以下命令来运行连接器:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

使用以下命令启动消费者控制台:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group

它只将数据打印到其中一个消费者控制台。但是,如果我使用生产者控制台而不是 Kafka 连接来编写消息,那么我可以看到多个消费者(以循环方式)的消息,这是应该的方式。但是使用 Kafka 连接,它只是将所有数据写入单个分区,同一组中的其他消费者必须处于空闲状态。需要更改哪些内容才能写入轮询系统中的所有分区?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    此答案适用于 Apache Kafka 0.10.2.1,但不一定适用于未来版本。

    您可能知道,文件源连接器生成带有null 键和null 主题分区号的消息。这意味着 Kafka Connect 的生产者可以使用它的 partitioner 分配主题分区,对于具有 null 键的消息,default partitioner 将尝试将消息循环到可用的 分区。

    但是,您遇到了 JSON 转换器的怪癖之一,该转换器通过 key.converter 和 @987654329 在 standalone.properties 文件中配置@属性:

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    

    当 JSON 转换器配置为启用模式时,JSON 表示会在值周围包含一个信封,以便键或值同时包含 模式和有效负载

    {
        "schema": ...,
        "payload": ...
    }
    

    您的standalone.properties 文件配置密钥转换器并启用模式,因此即使连接器使用null 键和null 模式生成消息,JSON 转换器(启用模式)始终把这些包在一个信封里。因此,每条消息的密钥都是:

    {
        "schema": null,
        "payload": null
    }
    

    生产者的默认分区器总是将这些相同的键散列到相同的分区

    要更改行为,请编辑您的standalone.properties 文件并将key.converter.schemas.enable 属性更改为false

    key.converter.schemas.enable=false
    

    您可以选择将value.converter.schemas.enable 属性更改为false 以更改 的写入方式,以不将值包装在信封中并包含架构:

    value.converter.schemas.enable=false
    

    这也影响了转换器如何处理 null 值,当删除具有特定键的源实体时,某些连接器会生成这些值。例如,当从源数据库中删除一行时,某些变更数据捕获连接器会执行此操作。这对log compacted topics 非常有效,因为每条消息都代表键控实体的最后一个已知状态,并且因为空 value 对应于告诉 Kafka 所有消息的 tombstone 记录在该墓碑之前使用相同的密钥都可以从日志中删除。但是,如果将值转换器配置为 启用模式的 JSON 转换器,则永远不会输出 null 消息值,因此日志压缩永远不会删除墓碑消息。这是一个小问题,但需要注意。

    如果您想在 JSON 中编码您的键和值,那么您可能不需要或不需要这些模式,因此可以为它们的键和值 JSON 转换器打开 schemas.enable

    对于那些真正使用模式的人,请考虑使用Confluent's Schema Registry 和 Avro 转换器。不仅编码的消息明显更小(由于 Avro 编码而不是 JSON 字符串编码),编码的消息包括 Avro 模式的 ID,因此允许您 evolve your message schemas over time 而无需协调升级您的生产者和消费者以使用完全相同的模式。各种优点都有!

    【讨论】:

    • 太棒了!该解决方案奏效了,感谢您提供有关它为什么以及如何工作的见解。
    猜你喜欢
    • 2020-05-11
    • 1970-01-01
    • 2017-04-01
    • 2021-07-03
    • 2019-04-05
    • 2017-09-13
    • 2020-06-02
    • 2019-09-21
    • 1970-01-01
    相关资源
    最近更新 更多