【问题标题】:org.apache.kafka.common.errors.RecordTooLargeException in Flume Kafka SinkFlume Kafka Sink 中的 org.apache.kafka.common.errors.RecordTooLargeException
【发布时间】:2017-02-28 11:25:25
【问题描述】:

我正在尝试从 JMS 源读取数据并将它们推送到 KAFKA 主题,而在这样做几个小时后,我观察到推送到 KAFKA 主题的频率几乎为零,经过一些初步分析,我发现 FLUME 日志中出现以下异常。

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

我的水槽显示 max.request.size 的当前设置值(在日志中)为 1048576 ,这显然非常小于 1399305 ,增加这个 max.request.size 可能会消除这些异常,但我无法找到正确的位置更新该值。

我的flume.config,

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data

a1.sources.r1.type = jms

a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true

a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1

任何帮助将不胜感激!

【问题讨论】:

    标签: apache-kafka flume-ng


    【解决方案1】:

    这个改变必须在 Kafka 完成。 更新 Kafka 生产者配置文件producer.properties 为较大的值,如

    max.request.size=10000000
    

    【讨论】:

    • 我正在使用 FLUME,它使用 KAFKA 的生产者库来推送主题上的消息,但我无法在 FLUME 中将其视为可配置的;我需要将任何硬编码值更改为生产者类吗?
    • @RiteshSharma 你是说服务器上没有安装Kafka?
    • 实际上这个“max.request.size”问题是 FLUME 来的经纪人; FLUME 没有提供任何专用的配置文件作为“producer.properties”,您只需要在 FLUME 配置中更新 kafka 生产者属性。
    • 是的,Flume 使用 Kafka。所以我的解决方案是在 Kafka 级别更改此属性,因为属性 max.request.size 属于 kafka 生产者,而后者又适用于 Flume!
    • 是的,谢谢你的帮助!!
    【解决方案2】:

    看来我的问题已经解决了;由于怀疑增加max.request.size 消除了异常,FLUME 为更新此类 kafka sink(producer) 属性提供了常量前缀kafka.producer。我们可以将这个常量前缀附加到任何 kafka 属性中;

    所以我的意思是,a1.sinks.k1.kafka.producer.max.request.size = 5271988

    【讨论】:

    • 哇。从来不知道这是可能的!
    猜你喜欢
    • 2019-08-31
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多