【问题标题】:Spring Integration Processor弹簧集成处理器
【发布时间】:2022-01-20 17:50:06
【问题描述】:

我想定义一个写入 MongoDB 的流,并且只有在成功时才会将 ID 写入 Kafka。我正在使用 JavaDSL,我希望有一个 FlowBuilder 类来定义我的高级管道。我正在寻找能够让我编写流程的功能,例如:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .process(writeToMongo) // <-- Searching for this kind of function
      .handle(writeToKafka)
      .get();
}

我已经看到Apache Camel works exactly like this,我想知道 Spring Integration 是否也有一个简单而好的解决这个基本问题的方法。

【问题讨论】:

    标签: spring spring-boot spring-integration project-reactor


    【解决方案1】:

    您正在寻找的是publishSubscribeChannel(),它可以拥有多个订阅者。默认情况下,如果没有在频道上配置执行器,下一个订阅者只会在前一个订阅者之后并且只有当这个订阅者成功时才会被调用。

    它可能看起来和你用 process() 表达的相似:

    public IntegrationFlow buildFlow() {
       return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
          .publishSubscribeChannel(c -> c
                            .subscribe(sf -> sf
                                    .handle(MongoDb.reactiveOutboundChannelAdapter())) 
          .handle(writeToKafka)
          .get();
    }
    

    另一个选项是gateway(),但是你需要从那里返回一些东西才能继续。在 Spring Integration 中,如果没有回复,则流程将停止。如果没有out,它没有将in 用于out 的概念。

    【讨论】:

    • 谢谢。我已经看到in the docs 在解释gateway() 时有对Reactor 的引用,但没有任何IntegrationFlows 用法。你会推荐以这种方式构建响应式流程吗?还是我应该保持你在这里写的同样的方式?
    • 如果可以的话,我想问另一个问题:I've seen here 您写道,Spring Integration 中的 java DSL 已不受支持。这是真的吗?我已经看到 Java DSL project 真的不活跃
    • 我认为 Reactor 与您的问题无关。您询问了 Camel 的等效流程,我向您解释了 Spring Integration 是如何实现的。我认为带有 Reactor 的网关值得提出自己的 SO 问题。
    • 是的...看起来我的信息有点不清楚。见这里:github.com/spring-projects/spring-integration-java-dsl/wiki/…。关键是Java DSL 现在完全是spring-integration-core 的一部分。在过去,它是一个单独的依赖项。
    • 没有。它不会。如果当前消息失败,那么这是一个错误。将其视为某些您自己的逻辑中的顺序方法调用。是的,您可以将 handle() 视为 Camel 的进程,不同之处在于,如果您为当前端点返回 null,则 Spring Integration 不会转到下一个端点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多