【发布时间】:2019-10-25 18:53:12
【问题描述】:
我有两个流:
[topicA] -> processingA -> [topicB] -> processingB -> [topicC]
登录我的应用程序后,我注意到在将输出从 processingA 发送到 topicB 和从 topicB 为 processingB 选择消息之间,每次需要超过 100 毫秒(而不是 150 毫秒)。它可能不多,但它会累积起来,最后相当简单的级联处理几乎需要第二次。
我可以调整 kafka 以使这些延迟尽可能接近于零吗? 哪些配置参数对这些延迟有影响?
我大部分都是默认配置。是 commit.interval.ms 导致延迟吗?我已经将它从更高的默认值更改...
StreamsConfig values:
application.id = app
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
【问题讨论】:
-
如果我理解正确,您可以在 processingA 之后具体化您的结果流。你能避免这种物化吗?您只需在 processingA 之后立即运行您的 processingB (我想它会返回一个 KStream insatnce)
-
@dmkvl 抱歉,但它必须通过主题。还有其他进程也会产生 topicB 消息...
标签: apache-kafka apache-kafka-streams