【问题标题】:How to handle exceptions in Kafka sink?如何处理 Kafka sink 中的异常?
【发布时间】:2019-02-17 21:44:14
【问题描述】:

我有一个将数据写入 Kafka 的 Flink 作业。 Kafka 主题的最大消息大小设置为 5 MB,因此如果我尝试写入任何大于 5 MB 的记录,它会引发以下异常并导致作业停止。

java.lang.Exception: Failed to send data to Kafka: The request included a message larger than the max message size the server will accept.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:350)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

现在我已经在我的作业中配置了检查点,所以如果作业失败,它会再次重新启动。问题是,每次重新启动时,它都会因相同的记录而失败,并进入无限循环的失败和重新启动。有没有办法在我的代码中处理这个 Kafka 异常,以免影响整个工作?

【问题讨论】:

    标签: apache-kafka apache-flink flink-streaming


    【解决方案1】:

    也许您可以在 Kafka 接收器前面引入一个过滤器,以检测并过滤掉太大的记录。有点hacky,但它可能很容易。否则我会考虑扩展 FlinkKafkaProducer010 以便能够处理异常。

    【讨论】:

    • 非常感谢。我能够通过扩展 FlinkKafkaProducer010 来处理异常 :-)
    • @david 我对 s3 flyestream sink 有类似的问题。如果其中一条记录与 avro 模式不匹配,则我的检查点失败并且它进入无限循环。如何跳过导致异常的记录并继续前进。
    猜你喜欢
    • 2018-09-29
    • 1970-01-01
    • 2020-05-27
    • 1970-01-01
    • 2018-07-08
    • 2019-10-20
    • 2020-10-21
    • 2021-05-05
    • 1970-01-01
    相关资源
    最近更新 更多