【问题标题】:Spring integration kafka how to handle error when produce to kafkaSpring集成kafka如何在生产到kafka时处理错误
【发布时间】:2016-10-10 23:40:23
【问题描述】:

在使用 int-kafka:outbound-channel-adapter 生产到 kafka 时,似乎没有可用的错误通道。在这种情况下,如何处理 reties 次后无法生成到 kafka 的消息?

任何可能导致生产卡夫卡失败的错误。 (以下代码只是网上的sn-p代码,只是想知道如何为其添加错误句柄)

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    topic="test">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProps">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="XXXXXX:6667"
                   key-class-type="java.lang.String"
                   value-class-type="java.lang.String"
                   topic="rating"
                   value-serializer="kafkaSerializer"
                   key-serializer="kafkaSerializer"
                   compression-type="none"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>


<util:properties id="producerProps">
    <prop key="queue.buffering.max.ms">500</prop>
    <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
    <prop key="queue.buffering.max.messages">10000</prop>
    <prop key="retry.backoff.ms">100</prop>
    <prop key="message.send.max.retries">2</prop>
    <prop key="send.buffer.bytes">5242880</prop>
    <prop key="socket.request.max.bytes">104857600</prop>
    <prop key="socket.receive.buffer.bytes">1048576</prop>
    <prop key="socket.send.buffer.bytes">1048576</prop>
    <prop key="request.required.acks">1</prop>
</util:properties>

【问题讨论】:

    标签: spring spring-integration spring-kafka


    【解决方案1】:

    请关注这个问题的答案exception handling in int-jpa:updating-outbound-gateway

    由于您有&lt;polled&gt;,您可以指定error-channel 并处理该通道上的流中的错误。

    【讨论】:

      猜你喜欢
      • 2015-05-24
      • 1970-01-01
      • 2020-07-14
      • 1970-01-01
      • 2017-09-07
      • 2023-03-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多