【问题标题】:Spring Integration Message Driven Channel Adapter not working with Spring-Kafka 2.3+Spring Integration Message Driven Channel Adapter 不适用于 Spring-Kafka 2.3+
【发布时间】:2021-03-25 14:14:14
【问题描述】:

在尝试让消息驱动通道适配器与 Spring-Kafka 2.3+ 一起使用时,我遇到了以下问题。有人有任何可以帮助我的示例代码吗?

1. org.springframework.kafka.listener.config.ContainerProperties 实际上并不存在。

2。 org.springframework.kafka.listener.ContainerProperties 确实存在,但在尝试运行时会产生以下问题。

说明:

试图调用一个不存在的方法。尝试是从以下位置进行的:

org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)

以下方法不存在:

org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z

3.如果您使用 kafka 2.5 及更高版本,则会出现此问题,但会被替换为 2021-03-22 13:56:05.102-0400 org{local_sparta} WARN [data-pipeline,,,] [DP-ACCOUNT] [DPA] [] AnnotationConfigServletWebServerApplicationContext:main 上下文初始化期间遇到异常 - 取消刷新尝试:org. springframework.beans.factory.BeanCreationException:创建名为“org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration”的bean时出错:通过构造函数实例化bean失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration]:构造函数抛出异常;嵌套异常是 org.springframework.beans.factory.UnsatisfiedDependencyException:在类路径资源 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class] 中定义的名称为“kafkaTemplate”的 bean 创建时出错:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有 'org.springframework.kafka.core.ProducerFactory' 类型的合格 bean 可用:预计至少有 1 个 bean有资格成为 autowire 候选人。依赖注释:{}

尝试使用 Java 版本和 XML 版本都给出相同的错误。

Java 版本

@Configuration
@Slf4j
public class KafkaChannelConsumer {

    @Autowired
    MessageChannel preRouterLOB;

    @Value("${spring.kafka.bootstrap-servers:localhost9092}")
    private String bootstrapServers;

    @Value("${spring.kafka.topic:55iptest}")
    private String springIntegrationKafkaTopic;

    @Bean
    public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
                kafkaListenerContainer());
        kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
        return kafkaMessageDrivenChannelAdapter;
    }

    @SuppressWarnings("unchecked")
    @Bean
    public ConcurrentMessageListenerContainer kafkaListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);

        return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
                consumerFactory(), containerProps);
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public Map consumerConfigs() {
        Map properties = new HashMap();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
        return properties;
    }
}

XML 版本

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:int="http://www.springframework.org/schema/integration"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">


<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

第 1 期和第 2 期的 POM

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>5.4.5</version>
        </dependency>

这包括版本 Spring-Kafka 2.3.6

第 3 期的 POM

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>5.4.5</version>
    <exclusions>
       <exclusion>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.7</version>
</dependency>
       

【问题讨论】:

  • 谢谢,这一切都解决了,现在问题与我们在幕后使用的 Spring Boot 版本有关,它与我尝试使用的 spring-integration-kafka 不兼容。

标签: apache-kafka spring-integration spring-kafka


【解决方案1】:

&lt;version&gt;5.4.5&lt;/version&gt;

这包括版本 Spring-Kafka 2.3.6

不,它没有; spring-integration-kafka 的 5.4.x 版本需要 2.6.x;该方法已添加到 2.5 中的属性中。

查看项目页面了解兼容版本。

https://spring.io/projects/spring-kafka

如果你使用的是 Spring Boot,它会引入所有正确的版本,你根本不应该在你的 pom 中指定版本。

对于问题 3,您似乎在某处声明了与 kafka 模板 bean 不兼容的生产者工厂。

【讨论】:

  • 感谢您的帮助 spring-integration-kafka 5.4.5 肯定会引入 spring-kafka 版本 2.3.6,即使您在网站上正确指出 spring.io/projects/spring-kafka 它指定两者不是兼容的。我会调查你为问题 3 提出的问题
  • 如您所见;它“拉入” 2.6.7。 repo1.maven.org/maven2/org/springframework/integration/… 查看 maven 依赖树,了解这是如何发生的
  • 这是公司的事情。抱歉,这是因为我们没有直接使用弹簧靴,感谢您的帮助
  • 我发现问题出在 Spring Boot 版本上,因为我们在幕后使用的只有 2.2.X。更新后,现在看起来我已经解决了提到的问题。
【解决方案2】:

看起来你在搞乱不同的版本。

由于您的项目基于 Spring Boot,因此您肯定必须依赖其提供的依赖项的版本。某些版本组合确实不兼容。例如,ContainerPropertiesdeliveryAttemptHeader 属性以 2.5 开头:

/**
 * Set to true to populate the
 * {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
 * the error handler or after rollback processor implements
 * {@code DeliveryAttemptAware}. There is a small overhead so this is false by
 * default.
 * @param deliveryAttemptHeader true to populate
 * @since 2.5
 */
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {

只要确保你依赖 Spring Boot 插件及其依赖管理。 Spring Boot 中的所有部门都是一起测试的。只有当您尝试将 deps 更改为您自己的版本时会遇到的问题。

【讨论】:

  • 我相信使用 &lt;dependency&gt; &lt;groupId&gt;org.springframework.integration&lt;/groupId&gt; &lt;artifactId&gt;spring-integration-kafka&lt;/artifactId&gt; &lt;/dependency&gt; 会给我从 spring boot 中正确的版本是正确的
  • 正确。这就是 Spring Boot 依赖管理的目的。
  • 感谢 Aterm。我问是因为当我尝试上述方法时,我收到错误无法解析 org.springframework.integration:spring-integration-kafka:unknown。我认为这可能是因为我们并没有真正使用 Spring Boot,而是我们自己的公司版本。对于其他依赖项,这确实有效,所以也许我将不得不要求添加它。
  • 好的。然后,您需要咨询官方 Spring Boot POM 以了解您可以在项目中使用的版本。例如:docs.spring.io/spring-boot/docs/current/reference/html/…
猜你喜欢
  • 2017-07-03
  • 1970-01-01
  • 1970-01-01
  • 2017-04-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-07-21
相关资源
最近更新 更多