【问题标题】:Increase Kafka Streams Consumer Throughput增加 Kafka Streams 消费者吞吐量
【发布时间】:2020-10-13 21:24:18
【问题描述】:

我有一个 Spark Streaming 应用程序和一个 Kafka Streams 应用程序并排运行,用于基准测试。两者都使用相同的输入主题并写入不同的目标数据库。输入主题有 15 个分区,火花流和 kafka 流都有 15 个消费者(比例为 1:1)。此外,事件有效负载约为 2kb。不确定它是否相关,但 Spark Streaming 的 90% 执行时间约为 9 毫秒。卡夫卡流,12 毫秒。每次处理消息时,都会在我的处理器中调用 commit() 方法。

问题依赖于高爆发。 Spark Streaming 可以跟上每秒 700 的速度,而 Kafka Streams 只能达到每秒 60/70 左右。我不能超越。见下图:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)

根据下面的配置,只要每个消费者不超过 1000 个事件,考虑到背压,无论每个分区的字节数如何,火花流都可以跟上。至于 Kafka Streams,如果我正确理解了它的配置(请保持诚实),基于以下相同,我能够每 100 毫秒(poll.ms)获取最多 1000 条记录(max.poll.records),只要每个分区不超过 1MB (max.partition.fetch.bytes) 和每次提取不超过 50MB (fetch.max.bytes)。

无论我使用 5、10 还是 15 个消费者,我都看到相同的结果(每秒停留 70 个事件),这让我认为它与配置相关。我试图通过增加每次提取的记录数和每个分区的最大字节数来调整这些,但我没有得到显着的结果。

我知道这些是不同的技术并用于不同的目的,但我想知道我应该在 Kafka Streams 中使用哪些值来提高吞吐量。

Spark 流配置:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100

Kafka Streams 配置(所有字节和时间相关)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 

处理器代码

public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

提前致谢!

【问题讨论】:

  • 这里没有专家:我认为问题在于 Kafka 中的提交。可以肯定的是,我在您发布的配置中看不到自动提交被禁用(enable.auto.commit)。我也很惊讶你消费了 70 个事件而不管消费者的数量......我想知道是否所有记录最终都进入同一个分区,你能验证所有分区之间的负载平衡吗?查看代码可能也有帮助!也可能是主题配置。
  • 您好 Augusto,感谢您的意见。只需添加代码以防万一。所有分区都与消费者很好地平衡。是的,自动提交被禁用。如果您想查看其他配置,请告诉我。

标签: spark-streaming kafka-consumer-api apache-kafka-streams spark-streaming-kafka


【解决方案1】:

如果我正确理解了它的配置(请保持诚实),基于以下相同,我能够每 100 毫秒 (poll.ms) 获取最多 1000 条记录 (max.poll.records),只要因为每个分区不超过 1MB (max.partition.fetch.bytes),每次提取不超过 50MB (fetch.max.bytes)。

这是不正确的。 :) max.poll.records 指定poll() 可以返回多少条记录——如果单个“获取”到代理返回更多记录,下一个“poll()”调用将从消费者的内部缓冲区提供服务(即,没有网络请求)。 max.poll.records 基本上是一个调整应用程序代码的旋钮,即在再次调用poll() 之前我要处理多少条记录。更频繁地调用poll() 会使您的应用程序更具反应性(例如,仅当调用poll() 时才会发生重新平衡——您还需要经常调用poll,即使不违反max.poll.interval.ms)。

poll.ms 是在没有数据可用的情况下poll() 内的最大阻塞时间。这样可以避免忙等待。但是,如果有数据,poll() 会立即返回。

因此,实际的“网络吞吐量”仅基于“获取请求”设置。

【讨论】:

  • 嗨,马蒂亚斯,再次感谢您的意见!现在更有意义了。
【解决方案2】:

更新

Kafka Streams 写入的数据库是这里的一大瓶颈。在我们将它切换到更好的集群(更好的硬件、内存、内核等)之后,我使用下面的配置进行了调整,我每秒能够消耗大约 2k 个事件。提交间隔配置也发生了变化(根据 Augusto 的建议),并且还使用了 G1GC 垃圾收集器。

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false

【讨论】:

    猜你喜欢
    • 2019-06-14
    • 2018-08-31
    • 2012-07-05
    • 2013-04-16
    • 2023-02-02
    • 2022-01-22
    • 2015-04-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多