【问题标题】:spring integration kafka xml exceptionspring集成kafka xml异常
【发布时间】:2015-12-19 23:49:25
【问题描述】:

我正在使用 Spring Integration Kafka 1.3.0 版本,但我仍然得到如下异常。那么有什么想法可以解决这个问题吗?

Error creating bean with name
'org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0':
Invocation of init method failed; 
nested exception is java.lang.IllegalArgumentException: [Assertion failed] - this argument 
is required; it must not be null

我的 xml 文件是这样的:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    order="3"
        >
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

<bean id="producerProperties"
    class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
            <prop key="message.send.max.retries">5</prop>
            <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
            <prop key="request.required.acks">1</prop>
        </props>
    </property>
</bean>

<int-kafka:producer-context id="kafkaProducerContext"
    producer-properties="producerProperties">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="127.0.0.1:9092"
                   topic="test"
                   compression-type="default"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    我不确定该消息来自何处 - 在提出此类问题时,请始终包含完整的堆栈跟踪 - 这样可以更轻松地确定问题。

    我将您的配置粘贴到测试中,但我没有收到该错误,我收到有关缺少键和值的编码器的错误。

    这对我有用...

    <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
    
    <int-kafka:producer-context id="kafkaProducerContext"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration
                key-encoder="kafkaEncoder" value-encoder="kafkaEncoder"
                broker-list="127.0.0.1:9092" topic="test" compression-type="none" />
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
    

    请注意,default 不是受支持的压缩类型;选项当前为nonegzipsnappy

    【讨论】:

    • 谢谢Gary,我解决了,是spring-integration-kafka版本问题。 v1.3.0 完美运行,但据我所知 v1.0.0 缺少一些配置。
    • 1.0.0 已经很老了,不是“缺少”配置,只是在以后的版本中添加了更多配置。
    猜你喜欢
    • 2021-08-02
    • 1970-01-01
    • 2023-03-31
    • 2016-07-09
    • 2019-02-15
    • 2018-05-22
    • 2018-03-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多