【发布时间】:2022-01-22 10:50:10
【问题描述】:
我正在开发一个系统,该系统将大量事件数据从数据库导入到 Apache Kafka,然后我的 Java 应用程序处理这些数据,然后将消息放回 Kafka 主题。
我正在使用 Debezium 和 Kafka Connect 将数据导入 Kafka。然后来自 Debezium 的数据由我的消费者应用程序获取。这个消费者必须从单个分区中读取,因为我需要来自数据库的排序保证。问题是单个消费者跟不上Debezium生产者,所以我的消息越来越延迟。是否有可能以某种方式提高这个消费者的速度?提高消费者速度和吞吐量的最重要配置是什么?
我的 Debezium 消息不包含架构信息,因此它们并不大。 我的消费者是使用具有以下配置的 Kafka-Streams 库实现的:
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
properties.put(StreamsConfig.POLL_MS_CONFIG,50);
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
我的这个消费者的拓扑如下:
public Topology createTopology(String debeziumTopic, String otherTopic) {
JsonDebeziumSerde jsonDebeziumSerde = new JsonDebeziumSerde();
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(debeziumTopic, Consumed.with(Serdes.String(), jsonDebeziumSerde))
.foreach((k, v) -> {
try {
String id = v.get("ID").textValue();
kafkaTemplate.send(otherTopic, id, v);
} catch (NullPointerException ex) {
log.warn(debeziumTopic + " has empty message");
}
});
Topology topology = streamsBuilder.build();
return topology;
}
我的代理配置:
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
zookeeper.connection.timeout.ms=1000
log.retention.hours=1
num.partitions=10
delete.topic.enable=true
【问题讨论】:
-
您不应使用 KafkaProducer(或 Spring KafkaTemplate)从您的 Kafka Streams 拓扑中生成消息。 KafkaStreams 本身具有嵌入式生产者,当您使用“to(...)”指令流式传输结果时,将使用这些嵌入式生产者。不确定这是否能解决您的性能问题,但它会给您带来诸如 Exactly Once Processing 之类的好处。
-
消息/秒或兆字节/秒的吞吐量是多少?要了解您的集群支持什么,您可以与 kafka-producer-perf-test 和 kafka-consumer-perf-test 命令行实用程序进行比较。
-
“您不应该使用 KafkaProducer(或 Spring KafkaTemplate)从您的 Kafka Streams 拓扑中生成消息。” _ 你说得对,我改变了这个拓扑,现在我使用
.stream().to()而不是 kafkaTemplate 老实说,我不知道如何检查我的 java 应用程序流的通过率
标签: java apache-kafka apache-kafka-streams debezium