【问题标题】:Spring Cloud Stream with KTable Binder configuration - Throws java.lang.IllegalArgumentException: Method must be declarative具有 KTable Binder 配置的 Spring Cloud Stream - 引发 java.lang.IllegalArgumentException:方法必须是声明性的
【发布时间】:2021-03-19 16:13:25
【问题描述】:

需要使用 KTable 和 Spring-Kafka binder 配置,这里是用于从 topic 读取数据并在控制台上打印的示例代码。

但应用程序以 Throw java.lang.IllegalArgumentException: Method must be declarative 在应用程序启动时终止。

推荐Spring Cloud Stream Kafka - Method must be Declarative,结果还是一样

Java - 11

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.4</version>
</parent>
<properties>
    <java.version>11</java.version>
    <spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

活页夹:

public interface WordCountBinding {

    @Input(value = " data-input-channel")
    public KTable<String, String> readDataStream();
}

听众

@Service
@EnableBinding(value = WordCountBinding.class)
public class WordCountListener {

    @StreamListener(value = "data-input-channel")
    public void listen(KTable<String, String> data) {

        KStream<String, String> wordStream = data.filter((key,value) -> key.contains("SRS")).toStream();
        wordStream.foreach((key, value) -> System.out.println("Key: " + key + " Value: " + value));
    }
}

控制台输出:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-03-19 21:26:53.598 ERROR 1912 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
    at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:434) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:161) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:232) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:202) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:336) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:118) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:963) ~[spring-beans-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.shasr.streamwordcount.StreamWordcountApplication.main(StreamWordcountApplication.java:10) ~[classes/:na]

2021-03-19 21:26:53.613  INFO 1912 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'

Process finished with exit code 1

【问题讨论】:

    标签: apache-kafka-streams spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    您可以通过使用@Input 声明方法参数来消除该特定异常,如下所示。

    @StreamListener
        public void listen(@Input("data-input-channel") KTable<String, String> data) {
    

    更重要的是,建议不要使用StreamListener,因为它从 Spring Cloud Stream 的 3.1.x 版本开始已被弃用。推荐的方法是使用功能样式。有关详细信息,请参阅reference docs

    【讨论】:

      猜你喜欢
      • 2019-07-01
      • 2018-12-16
      • 2020-04-07
      • 1970-01-01
      • 1970-01-01
      • 2019-04-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多