【问题标题】:"Found ambiguous parameter type Void"? Spring Integration with Project Reactor“发现模棱两可的参数类型无效”? Spring 与 Project Reactor 的集成
【发布时间】:2022-01-18 15:19:52
【问题描述】:

我正在使用带有 Spring Integration 的 Project Reactor 从 Kafka 读取并写入 MongoDB,我的 Kafka 使用效果很好,但 .handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory)) 卡住了。我已经看到这个函数的内部代码是new ReactiveMongoDbStoringMessageHandler(mongoFactory)),所以我尝试了以下方法(我有一个transform()方法,可以从ConsumerRecord转换为Mono<String>,带有@Transformer注解):

    public IntegrationFlows writeToMongo() {
         return IntegrationFlows.from(kafkaChannel)
              .transform(this)
              .handle(new ReactiveMongoDbStoringMessageHandler(mongoFactory))
              .get();
    }

代码遵循文档https://docs.spring.io/spring-integration/reference/html/mongodb.html#mongodb-reactive-channel-adapters。 我得到的错误是: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: 然后是一长串函数。有什么原因会发生这种情况?

【问题讨论】:

    标签: spring spring-integration project-reactor reactor-kafka spring-data-mongodb-reactive


    【解决方案1】:

    如果您不打算订阅返回的Mono,则不能执行new ReactiveMongoDbStoringMessageHandler(mongoFactory).handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory)) 是正确的做法,因为它将 ReactiveMongoDbStoringMessageHandler 包装到 ReactiveMessageHandlerAdapter 中以进行自动订阅。

    但我认为你真正的问题在于.transform(this)。我相信您在这个类中有很多方法,因此请更具体地使用方法名称。而这与 Project Reactor 无关。不确定为什么要在发送到ReactiveMongoDbStoringMessageHandler 之前尝试转换为Mono...您可能在提供有效负载(ConsumerRecord?)时遇到问题,它不是用于保存到集合中的MongoDB 映射实体。

    【讨论】:

    • 是的,ConsumerRecord 类型确实存在问题,我想将其转换为 Mongo 能够映射的字符串。我不明白reactiveOutboundChannelAdapter(mongoFactory) 如何将ReactiveMongoDbStoringMessageHandler 包装成ReactiveMessageHandlerAdapter 以进行自动订阅(只是出于好奇,因为我在代码中没有看到)。无论如何,当我转换回.handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory)) 时,客户卡住了。你知道为什么吗?
    • 我已经通过.collectionName("colName") 指定一个集合来修复 mongo 写入。我会将其添加到文档中。我还认为 Java DSL 的反应式 Spring 集成示例数量非常有限(甚至 DSL 的非反应式示例数量也很有限)。如果不难,我建议添加一些。
    • ReactiveMessageHandlerAdapter 在其 handleMessage() impl: this.delegate.handleMessage(message).subscribe(); 中自动订阅。
    • ReactiveMongoDbMessageHandlerSpecReactiveMessageHandlerSpec 的扩展,它为我们做包装:this.target = new ReactiveMessageHandlerAdapter(this.reactiveMessageHandler);
    • 默认集合名称 (data) 在文档中进行了说明:docs.spring.io/spring-integration/docs/current/reference/html/…。 > 在这个示例中,我们将通过提供的ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到具有数据名称的默认集合中。
    【解决方案2】:

    我也遇到过。解决方案是(注意 ReactiveMessageHandlerAdapter):

    public IntegrationFlows writeToMongo() {
         return IntegrationFlows.from(kafkaChannel)
              .handle(new ReactiveMessageHandlerAdapter(new ReactiveMongoDbStoringMessageHandler(mongoFactory)))
              .get();
    }
    

    this issue 将被解析时,这可以替换为handleReactive()

    【讨论】:

      猜你喜欢
      • 2014-07-04
      • 1970-01-01
      • 1970-01-01
      • 2020-05-11
      • 1970-01-01
      • 1970-01-01
      • 2011-07-23
      • 1970-01-01
      • 2011-12-19
      相关资源
      最近更新 更多