【问题标题】:Single partition consumer speed and throughput单分区消费者速度和吞吐量
【发布时间】:2022-01-22 10:50:10
【问题描述】:

我正在开发一个系统,该系统将大量事件数据从数据库导入到 Apache Kafka,然后我的 Java 应用程序处理这些数据,然后将消息放回 Kafka 主题。

我正在使用 Debezium 和 Kafka Connect 将数据导入 Kafka。然后来自 Debezium 的数据由我的消费者应用程序获取。这个消费者必须从单个分区中读取,因为我需要来自数据库的排序保证。问题是单个消费者跟不上Debezium生产者,所以我的消息越来越延迟。是否有可能以某种方式提高这个消费者的速度?提高消费者速度和吞吐量的最重要配置是什么?

我的 Debezium 消息不包含架构信息,因此它们并不大。 我的消费者是使用具有以下配置的 Kafka-Streams 库实现的:

Properties properties = new Properties();

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

properties.put(StreamsConfig.POLL_MS_CONFIG,50);

properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);

我的这个消费者的拓扑如下:

public Topology createTopology(String debeziumTopic, String otherTopic) {
    
    JsonDebeziumSerde jsonDebeziumSerde = new JsonDebeziumSerde();

    
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    
    streamsBuilder.stream(debeziumTopic, Consumed.with(Serdes.String(), jsonDebeziumSerde))
                  
        .foreach((k, v) -> {
                      
            try {
                          
                String id = v.get("ID").textValue();
                          
                kafkaTemplate.send(otherTopic, id, v);
                                     
            } catch (NullPointerException ex) {
                          
                log.warn(debeziumTopic + " has empty message");
   
            }
                 
     });

    
    Topology topology = streamsBuilder.build();

    
   
    return topology;

}

我的代理配置:

auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
zookeeper.connection.timeout.ms=1000
log.retention.hours=1
num.partitions=10
delete.topic.enable=true

【问题讨论】:

  • 您不应使用 KafkaProducer(或 Spring KafkaTemplate)从您的 Kafka Streams 拓扑中生成消息。 KafkaStreams 本身具有嵌入式生产者,当您使用“to(...)”指令流式传输结果时,将使用这些嵌入式生产者。不确定这是否能解决您的性能问题,但它会给您带来诸如 Exactly Once Processing 之类的好处。
  • 消息/秒或兆字节/秒的吞吐量是多少?要了解您的集群支持什么,您可以与 kafka-producer-perf-test 和 kafka-consumer-perf-test 命令行实用程序进行比较。
  • “您不应该使用 KafkaProducer(或 Spring KafkaTemplate)从您的 Kafka Streams 拓扑中生成消息。” _ 你说得对,我改变了这个拓扑,现在我使用 .stream().to() 而不是 kafkaTemplate 老实说,我不知道如何检查我的 java 应用程序流的通过率

标签: java apache-kafka apache-kafka-streams debezium


【解决方案1】:

很难就性能给出一般性建议,但我会尝试分享一些我的经验。我正在运行一个 Kafka Streams 应用程序,它可以轻松地在单个线程上达到每秒超​​过 100k 条消息的吞吐量。不确定,这与您的用例相比如何。为了到达那里,我正在使用 VisualVM 及其采样器检查应用程序。但是任何分析器都可以。这将向您展示您的应用程序的瓶颈在哪里。

对我来说,这是 JSON 序列化和反序列化,我通过切换到协议缓冲区对其进行了改进。不确定,如果这是您的选择。也许 avro 对你来说是一个更好的选择。

另一个重大改进是对主题使用 zstd 压缩,这大大减少了数据量和网络负载。这也大大加快了 Kafka 流的速度。

最后,我想知道,您为什么使用 KafkaTemplate 来写入输出主题。我本来期望这样的 dsl 表达式:

streamsBuilder.stream(debeziumTopic, Consumed.with(Serdes.String(), jsonDebeziumSerde))
 
  .selectKey((k,v) -> {
    var id = v.get("ID");
    if (id != null) {
      return id.textValue();
    }
    log.warn(debeziumTopic + " has empty message");

    return null;
  })
  .filter((k,v) -> k != null)
  .to(otherTopic, Produced.with(Serdes.String(), jsonDebeziumSerde));

我不确定这种方法对性能的影响,但它对于 Kafka Streams 来说更惯用,因此它可能更有效。

【讨论】:

  • 感谢您的回复。我会测试你的建议,也许会有所帮助。
  • 告诉我,进展如何。
猜你喜欢
  • 2020-10-13
  • 2012-07-05
  • 2013-04-16
  • 1970-01-01
  • 1970-01-01
  • 2015-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多