【问题标题】:How to improve the performance to read from kafka and forward to kafka with kafka Stream Application如何使用 kafka Stream Application 提高从 kafka 读取和转发到 kafka 的性能
【发布时间】:2017-12-19 11:55:42
【问题描述】:

我有带有 1.0.0 Kafka 流 API 的 Kafka 流应用程序。我有单个代理 0.10.2.0 kafka 和单个分区的单个主题。除生产者 request.timeout.ms 外,所有可配置参数都相同。我将生产者 request.timeout.ms 配置为 5 分钟来修复 Kafka Streams program is throwing exceptions when producing 问题。

在我的流应用程序中,我从 Kafka 读取事件,处理它们并转发到同一个 kafka 的另一个主题。

计算统计数据后,我观察到处理占用了 5% 的时间,剩下的 95% 时间用于读取和写入。

即使我在 Kafka 中有数千万个事件,有时 Kafka poll 会返回个位数的记录,有时 Kafka poll 会返回数千条记录。

有时上下文转发需要更多时间来向 kafka 发送更少的记录,而有时上下文转发需要更少的时间来向 kafka 发送更多记录。

我尝试通过增加 max.poll.records,poll.ms 值来提高阅读性能。但没有运气。

如何在阅读和转发时提高性能? kafka poll 和 forward 将如何工作?哪些参数有助于提高性能?

以下是我的应用程序中几个重要的生产者配置参数。

acks = 1
batch.size = 16384
buffer.memory = 33554432
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 240000
retries = 10
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
transaction.timeout.ms = 60000
transactional.id = null

以下是我的应用程序中几个重要的消费者配置参数:

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 10000
metadata.max.age.ms = 300000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000

以下是我的应用程序中几个重要的流配置参数:

application.server =
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
key.serde = null
metadata.max.age.ms = 300000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 1000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

【问题讨论】:

  • 嗨,你有什么有趣的发现吗?

标签: apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams


【解决方案1】:

您可以通过控制键和增加主题的分区数来在操作中引入并行性。

以上内容会增加并行处理的 Kafka 流的数量。这可以通过增加 Kafka 流应用程序的线程数来解决

【讨论】:

    【解决方案2】:

    您可以在不同的线程中创建多个 Kafka 消费者,并将其分配给同一个消费者组。它们会并行消费消息,不会丢失消息。

    您如何发送消息? 使用 Kafka,您可以以“即发即弃”的方式发送消息:它提高了吞吐量。

    producer.send(record);
    

    acks 参数控制在生产者认为写入成功之前必须接收到多少个分区副本。

    如果您设置 ack=0,则生产者在假定消息发送成功之前不会等待代理的回复。但是,由于生产者不等待服务器的任何响应,它可以以网络支持的速度发送消息,因此可以使用此设置来实现非常高的吞吐量。

    【讨论】:

      猜你喜欢
      • 2023-03-12
      • 2019-03-16
      • 2020-10-17
      • 1970-01-01
      • 1970-01-01
      • 2023-02-20
      • 1970-01-01
      • 2017-09-10
      • 2018-01-19
      相关资源
      最近更新 更多