【问题标题】:A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel'已请求默认活页夹,但没有可用于“org.springframework.cloud.stream.messaging.DirectWithAttributesChannel”的活页夹
【发布时间】:2021-12-05 22:19:45
【问题描述】:

我正在尝试使用 Spring Cloud + Kafka Streams + Spring Boot 2 创建尽可能简单的 hello world。

我意识到我错过了基本概念。基本上,我明白:

1 - 我需要定义一个出站流以将消息写入 Kafka 主题,并定义一个入站流以从 Kafka 主题读取消息

public interface LoansStreams {

    String INPUT = "loans-in";
    String OUTPUT = "loans-out";

    @Input(INPUT)
    SubscribableChannel inboundLoans();

    @Output(OUTPUT)
    MessageChannel outboundLoans();

}

2 - 配置 Spring Cloud Stream 以绑定到我的流

@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}

3 - 配置 Kafka 属性

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        loans-in:
          destination: loans
          contentType: application/json
        loans-out:
          destination: loans
          contentType: application/json

4 - 为交换消息创建模型

@Getter @Setter @ToString @Builder
public class Loans {
    private long timestamp;
    private String result;
}

5 - 写入卡夫卡

@Service
@Slf4j
public class LoansService {
    private final LoansStreams loansStreams;
    public LoansService(LoansStreams loansStreams) {
        this.loansStreams = loansStreams;
    }
    public void sendLoan(final Loans loans) {
        log.info("Sending loans {}", loans);
        MessageChannel messageChannel = loansStreams.outboundLoans();
        messageChannel.send(MessageBuilder
                .withPayload(loans)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

6 - 听 Kafka 主题

@Component
@Slf4j
public class LoansListener {

    @StreamListener(LoansStreams.INPUT)
    public void handleLoans(@Payload Loans loans) {
        log.info("Received results: {}", loans);

    }
}

我花了一整天的时间阅读一些博客,我认为上面的代码至少是可行的。我不确定我是否真的尽可能地编写了最好的方法。顺便说一句,我得到了主题中提到的错误:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.

谷歌搜索解决方案,我发现有人说要对 StreamListe 编码返回模型,所以我将其替换为:

@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
    log.info("Received: {}", l);
    return l;
}

然后我得到一个至少对我来说不太清楚的错误(以前的错误清楚地提到了一些活页夹问题):

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
        at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]

如果它以某种方式有所帮助,我想将这个想法演变为应用 SAGAS,但这不是这个问题的重点。首先,我需要启动并运行基本功能。

*已编辑

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.mybank</groupId>
    <artifactId>kafka-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-cloud-stream</name>
    <description>Spring Cloud Stream With Kafka</description>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <!-- version>5.1.5.RELEASE</version-->
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

【问题讨论】:

  • 您不需要在StreamListener 中包含KStream,因为您没有任何KStream 绑定。您在构建配置中包含什么活页夹? (maven/gradle 等)你如何包含它们?我们可以看到一个示例配置吗?您的步骤 1 - 6 应该可以工作,可能会遗漏一些东西。一个小的示例应用程序会有所帮助。
  • 是的,您的接口是用于消息通道绑定器,而不是 KStream 绑定器。在应用程序中读取和写入同一主题也有点奇怪。你会得到一个无限循环。

标签: spring-boot apache-kafka apache-kafka-streams spring-cloud-stream


【解决方案1】:

“已请求默认绑定器,但没有可用的绑定器...”,请添加如下依赖项。

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

【讨论】:

  • 我的 pom.xml 中已经有了 spring-cloud-stream-binder-kafka-streams。我将在上面添加我的整个 pom
  • 请注意 spring-cloud-stream-binder-kafka 而不是 spring-cloud-stream-binder-kafka-streams
  • 你可以同时添加它们,spring-cloud-stream-binder-kafka 是用于 kafka 客户端 api 的;而 spring-cloud-stream-binder-kafka-streams 用于 kafka 流 api。
  • @jimC - 只有当您的应用程序是基于 Kafka Streams 的应用程序时,您才使用 ..binder-kafka-streams。我的意思是,您在应用程序中使用 kafka 流库,并使用 kafka 流提供的高级 DSL 或低级处理器 API。当您想利用 Spring Integration、基于 Spring Cloud Function/Project Reactor 的构造时,您可以使用 ..binder-kafka。它们在引擎盖下都是非常不同的粘合剂。
  • 一个简短的提示。 Kakfa客户端用于生产者(Source)和消费者(Sink),转换(INPUT->OUTPUT),进程(INPUT->OUTPUT)并绑定到消息通道,您可以使用Rabbit binder。 Kafka 流是应用程序流程的高级替代方案 - 加入、分支、过滤、映射值等。并直接绑定到kafka topic。
【解决方案2】:

您可以在 application.yml(或 application.properties)中定义您的默认绑定器

spring:
  cloud:
    stream:
      bindings:
        ...
      default-binder: kafka

【讨论】:

  • 这个案例对我有用,因为我在同一上下文中有两个活页夹。
  • 你也可以像这样使用'defaultBinder: kafka'
【解决方案3】:

对我来说,对于不同的上下文和多个输出绑定使用不同的 application.properties,我可以修复它的唯一方法是定义一个通用的默认绑定,例如:

spring:
  cloud:
    stream:
      default-binder: eventhub
      ...

其余的绑定类型也在每个输入/输出中单独设置。

【讨论】:

    【解决方案4】:

    在上面的pom文件中,需要使用binder-kafka而不是binder-kafka-streams

    所以替换

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            </dependency>
    

             <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
    

    【讨论】:

      猜你喜欢
      • 2021-07-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-22
      • 1970-01-01
      • 1970-01-01
      • 2018-12-07
      • 2011-10-04
      相关资源
      最近更新 更多