【发布时间】:2020-10-13 21:24:18
【问题描述】:
我有一个 Spark Streaming 应用程序和一个 Kafka Streams 应用程序并排运行,用于基准测试。两者都使用相同的输入主题并写入不同的目标数据库。输入主题有 15 个分区,火花流和 kafka 流都有 15 个消费者(比例为 1:1)。此外,事件有效负载约为 2kb。不确定它是否相关,但 Spark Streaming 的 90% 执行时间约为 9 毫秒。卡夫卡流,12 毫秒。每次处理消息时,都会在我的处理器中调用 commit() 方法。
问题依赖于高爆发。 Spark Streaming 可以跟上每秒 700 的速度,而 Kafka Streams 只能达到每秒 60/70 左右。我不能超越。见下图:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)
根据下面的配置,只要每个消费者不超过 1000 个事件,考虑到背压,无论每个分区的字节数如何,火花流都可以跟上。至于 Kafka Streams,如果我正确理解了它的配置(请保持诚实),基于以下相同,我能够每 100 毫秒(poll.ms)获取最多 1000 条记录(max.poll.records),只要每个分区不超过 1MB (max.partition.fetch.bytes) 和每次提取不超过 50MB (fetch.max.bytes)。
无论我使用 5、10 还是 15 个消费者,我都看到相同的结果(每秒停留 70 个事件),这让我认为它与配置相关。我试图通过增加每次提取的记录数和每个分区的最大字节数来调整这些,但我没有得到显着的结果。
我知道这些是不同的技术并用于不同的目的,但我想知道我应该在 Kafka Streams 中使用哪些值来提高吞吐量。
Spark 流配置:
spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100
Kafka Streams 配置(所有字节和时间相关)
# Consumer Config
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
request.timeout.ms = 30000
enable.auto.commit = false
# StreamsConfig
poll.ms=100
处理器代码
public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String payload) {
ResponseEntity responseEntity = null;
try {
// Do Some processing
} catch (final MyException e) {
// Do Some Exception Handling
} finally {
context.forward(UUID.randomUUID().toString(), responseEntity);
context.commit();
}
}
提前致谢!
【问题讨论】:
-
这里没有专家:我认为问题在于 Kafka 中的提交。可以肯定的是,我在您发布的配置中看不到自动提交被禁用(
enable.auto.commit)。我也很惊讶你消费了 70 个事件而不管消费者的数量......我想知道是否所有记录最终都进入同一个分区,你能验证所有分区之间的负载平衡吗?查看代码可能也有帮助!也可能是主题配置。 -
您好 Augusto,感谢您的意见。只需添加代码以防万一。所有分区都与消费者很好地平衡。是的,自动提交被禁用。如果您想查看其他配置,请告诉我。
标签: spark-streaming kafka-consumer-api apache-kafka-streams spark-streaming-kafka