【问题标题】:RecordTooLargeException in Kafka streams joinKafka 流中的 RecordTooLargeException 加入
【发布时间】:2018-03-17 10:00:33
【问题描述】:

我有一个 KStream x KStream 连接,但出现以下异常。

Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

我将加入 Click 主题和 Recommendation 主题。 Click 对象非常小(小于 1 KB)。另一方面,Recommendation 可能很大,有时会超过 1 MB。

我在 Google 上搜索了异常并发现 (here) 我需要在生产者配置中设置 max.request.size

我不明白的是,生产者在流加入中的位置是什么? topic=my_outgoing_recs_prod 上面的异常中的主题是推荐主题,而不是最终加入的主题。流应用程序不应该只是“消费”它吗?

不过,我尝试将属性设置为 config.put("max.request.size", "31457280");,即 30MB。我预计推荐记录不会超过该限制。尽管如此,代码还是崩溃了。

我无法更改 Kafka 集群中的配置,但如果需要,我可以更改 Kafka 中相关主题的属性。

有人可以建议我还可以尝试什么吗?

如果没有任何效果,我愿意忽略这些过大的消息。但是,我不知道如何处理这个RecordTooLargeException

我执行连接的代码如下。

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");


KStreamBuilder builder = new KStreamBuilder();

KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);

KStream<String, ClickRec> join = clicksStream.join(
        recsStream,
        (click, recs) -> new ClickRec(click, recs),
        JoinWindows.of(windowMillis).until(3*windowMillis));

join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();

ClickRec 是连接对象(它远小于Recommendation 对象,我预计它不会大于几 KB)。

在上面的代码中,我应该在哪里放置 try...catch 以从偶尔出现的超大对象中恢复?

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    不同级别有多个配置:

    1. 您有一个代理设置message.max.bytes(默认为1000012)(参见http://kafka.apache.org/documentation/#brokerconfigs
    2. 有一个主题级别的配置max.message.bytes(默认为1000012)(参见http://kafka.apache.org/documentation/#topicconfigs
    3. 生产者有max.request.size(默认为1048576)(参见http://kafka.apache.org/documentation/#producerconfigs

    您的堆栈跟踪表明,您需要在代理或主题级别更改设置:

    原因:org.apache.kafka.common.errors.RecordTooLargeException:请求包含的消息大于服务器将接受的最大消息大小。

    也许你还需要增加生产者设置。

    你为什么首先需要这个:

    当您执行 KStream-KStream 连接时,连接运算符会建立状态(它必须缓冲来自两个流的记录才能计算连接)。默认情况下,状态由 Kafka 主题支持——本地状态基本上是一个缓存,而 Kafka 主题是事实的来源。因此,您的所有记录都将写入 Kafka Streams 自动创建的“更改日志主题”。

    【讨论】:

    • 我已经编辑了帖子以反映我尝试的配置更改。这是指定这些配置的正确方法吗?还是需要对 Kafka 集群进行这些更改(我不能。我只能控制我的流应用程序配置)?
    • 所以我尝试了上述设置,但仍然得到同样的错误。
    • 如果无法通过我的应用程序中的配置更改来解决此问题,有没有办法在不关闭流连接的情况下捕获该异常并忽略它?
    • 代理/主题配置需要在集群中完成...以防异常。我建议在你写这个话题之前介绍一个flatMap()。在这个 flatMap 中,您使用序列化程序将键和值都转换为 byte[] 类型——因此允许您检查大小,如果大小超过限制,您将删除记录,否则返回序列化记录(即,flatMap 的返回类型将是 &lt;byte[],byte[]&gt;)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-05
    • 1970-01-01
    • 2018-11-03
    • 2020-11-04
    • 2020-03-28
    • 1970-01-01
    • 2019-10-11
    相关资源
    最近更新 更多