【问题标题】:Tuning kafka streams for speed调整 kafka 流以提高速度
【发布时间】:2019-10-25 18:53:12
【问题描述】:

我有两个流:

[topicA] -> processingA -> [topicB] -> processingB -> [topicC]

登录我的应用程序后,我注意到在将输出从 processingA 发送到 topicB 和从 topicB 为 processingB 选择消息之间,每次需要超过 100 毫秒(而不是 150 毫秒)。它可能不多,但它会累积起来,最后相当简单的级联处理几乎需要第二次。

我可以调整 kafka 以使这些延迟尽可能接近于零吗? 哪些配置参数对这些延迟有影响?

我大部分都是默认配置。是 commit.interval.ms 导致延迟吗?我已经将它从更高的默认值更改...

StreamsConfig values: 
    application.id = app
    application.server = 
    bootstrap.servers = [localhost:9092]
    buffered.records.per.partition = 1000
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 100
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    num.standby.replicas = 0
    num.stream.threads = 1
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
    poll.ms = 100
    processing.guarantee = exactly_once
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    replication.factor = 1
    request.timeout.ms = 40000
    retries = 0
    retry.backoff.ms = 100
    rocksdb.config.setter = null
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    state.cleanup.delay.ms = 600000
    state.dir = /tmp/kafka-streams
    topology.optimization = none
    upgrade.from = null
    windowstore.changelog.additional.retention.ms = 86400000

【问题讨论】:

  • 如果我理解正确,您可以在 processingA 之后具体化您的结果流。你能避免这种物化吗?您只需在 processingA 之后立即运行您的 processingB (我想它会返回一个 KStream insatnce)
  • @dmkvl 抱歉,但它必须通过主题。还有其他进程也会产生 topicB 消息...

标签: apache-kafka apache-kafka-streams


【解决方案1】:

你的消费者/处理轮询频率是多少?

尝试减少这种情况,以便您的处理器快速消耗数据

您的poll.ms 也设置为 100 毫秒,以减少它,看看是否有帮助。

【讨论】:

  • 似乎效果最好。将 poll.ms 和 commit.interval.ms 减少到 50ms 我能够将延迟减少到 100ms 以下
  • @redguy 很高兴知道这一点
【解决方案2】:

在您的情况下,100 到 150 毫秒似乎很正常,因为您设置了 commit.interval.ms = 100(这也是 processing.guarantee = exact_once 的默认值)。 只有一次 processingB 只会读取已在 [topicB] 中提交的消息,而 processingA 只会在(至少)100 毫秒后提交。

无需重新构建系统,您可以调整 commit.interval.ms 以获得更好的延迟,但如果您降低该值,吞吐量也会降低。

请参阅this post(Streams 性能影响),其中描述了这种折衷。

【讨论】:

  • 我可以将 commit.interval.ms 设置为更小,比如 10 毫秒吗?重新架构仍然是一种选择,但你到底是什么意思?我无法避免发布到主题“topicB”,我需要在这里持久化,并且它是与其他流程的共同点
  • 我还可以看到,这些延迟可能远高于提交间隔——即对于 commit.interval=50ms,我观察到 250ms 延迟......
  • 我看起来你不能简单地重新架构,因为你需要通过 topicB ......如果 250ms 是否正常,我无法提供更多建议,这真的取决于你的 kafka 集群。您可以测量集群上的延迟和吞吐量,以获得能够测量使用 kafka 流添加的开销的基线。
  • 不建议将提交间隔设置得太低,因为这意味着 Kafka Streams 和代理的开销很大,并且可能会显着影响您的吞吐量。 -- 问题是,延迟从何而来(读还是写?)。也许您可以减少生产者的linger.ms 配置?或者减少生产者的批量大小?对于消费者规模,max.poll.records 配置可能会减少延迟。请注意,减少所有这些设置会影响吞吐量(这是典型的吞吐量与延迟权衡)。
  • linger.ms 设置为 0,网络延迟无关紧要,因为我在单机上的 docker 映像中运行 kafka 和 zookeeper。将尝试尝试其他提到的配置
猜你喜欢
  • 2014-04-09
  • 1970-01-01
  • 2017-10-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多