【问题标题】:Spring cloud stream rabbitmq binder - spring cloud function error handlingspring cloud streamrabbitmq binder-spring cloud函数错误处理
【发布时间】:2020-06-11 13:01:51
【问题描述】:

我正在使用带有 spring cloud 功能的 spring cloud stream rabbit binder 并定义如下监听器:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

我还将失败的消息重新路由到 DLQ。问题是当像org.springframework.messaging.converter.MessageConversionException 这样的致命错误发生时。它不会像https://docs.spring.io/spring-amqp/reference/html/#exception-handling 中提到的那样被ConditionalRejectingErrorHandler 处理,并且永远循环。

有没有办法使用ConditionalRejectingErrorHandler 进行这项工作?

现在我通过使用@ServiceActivator(inputChannel = "errorChannel") 并自己处理错误来解决问题。

依赖关系:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-hateoas</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
</dependencies>

【问题讨论】:

    标签: java spring-boot spring-cloud-stream spring-rabbit spring-cloud-function


    【解决方案1】:

    长期以来,我们一直在讨论用于命令式函数的错误处理和其他功能,以及它们如何应用于(或者甚至可以应用于)反应式函数,并尝试了几种不同的方法,但不幸的是,这一切都归结为阻抗不匹配。

    您所描述的方法是基于对单个消息的操作。这是命令式消息处理程序(例如Function&lt;String, String&gt;)中的工作单元。您使用响应式样式,并通过这样做将工作单元从流中的单个消息更改为整个流。

    简而言之:

    - Function<?, ?> - unit of work is Message
    - Function<Flux<?>, Flux<?>> - unit of work is the entire stream
    

    您还可以轻松地观察到它,因为响应式函数在应用程序的生命周期内仅调用一次,而命令式函数在每个到达的消息中调用一次。 我这么说的原因是我们用于命令式消息处理程序(函数)的基于框架的方法不能应用于响应式而不引起副作用。通常反应式开发人员理解这一点,特别是考虑到反应式 API 的丰富性,特别是在错误处理方面

    无论如何,我们都会相应地更新文档。

    【讨论】:

    • 但它确实适用于@StreamListener("foo"),然后是public void foo(Flux&lt;Object&gt; input),不适用于Function&lt;?, ?&gt;,所以我认为这更像是一个“云功能”问题。无论如何,@ServiceActivator(inputChannel = "errorChannel") 方法是否正确?
    • 正如我所说,我们也让它与函数一起工作,但在解决一个问题时,我们又创建了几个问题。例如,我们会告诉想要对流进行分组、过滤、聚合、窗口化等的高级响应式用户什么?所有这些案件都是半破的。这些是为什么有人会选择使用响应式的真实案例,此时它已成为一个问题。
    猜你喜欢
    • 2019-10-03
    • 2021-12-11
    • 1970-01-01
    • 1970-01-01
    • 2019-04-19
    • 1970-01-01
    • 1970-01-01
    • 2021-11-26
    • 1970-01-01
    相关资源
    最近更新 更多