【发布时间】:2020-09-23 02:59:30
【问题描述】:
我们在项目中使用 Kafka 流状态存储,我们想要存储超过 1MB 的数据,但出现以下异常:
序列化时消息为 1760923 字节,大于 您使用 max.request.size 配置的最大请求大小 配置。
然后我点击链接Add prefix to StreamsConfig to enable setting default internal topic configs 并添加了以下配置:
topic.max.request.size=50000000
然后应用程序工作正常,当状态存储内部主题已创建但重新启动Kafka并且状态存储主题丢失/删除时,它可以正常工作,然后Kafka流处理器需要创建内部状态存储主题启动应用程序时会自动抛出异常,提示:
"Aorg.apache.kafka.streams.errors.StreamsException: Could not create topic data-msg-seq-state-store-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)....
.....
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: max.request.size".
解决方案是我们可以手动创建内部主题,但这应该不是一个好方法。
你能帮我解决这个问题吗?如果我错过了任何配置?
非常感谢。
2020 年 6 月 17 日更新:仍未解决问题。有人可以帮忙吗?
【问题讨论】:
-
max.request.size是生产者配置,而不是主题配置:kafka.apache.org/documentation/#max.request.size -
是的。这是否意味着我需要为代理上的生产者配置这个?或者我误解了一些东西。因为这是内部话题。
-
意思是,需要在producer上配置。 -- 在内部,Kafka Streams 使用 Kafka cosumer/producer/admin 客户端,您可以像直接使用这些客户端一样配置它们:docs.confluent.io/current/streams/developer-guide/…
-
其实我已经为生产者配置了max.request.size,如果我没有为内部主题(changelog)配置它,那么它会抛出异常。
标签: apache-kafka apache-kafka-streams