【发布时间】:2022-01-23 20:51:17
【问题描述】:
当我将 Spring Integration 与 Project Reactor 一起使用时,我收到了 Caused by: java.lang.IllegalStateException: No subscriptions have been created 的错误,我试图弄清楚如何订阅。我原来的代码是:
@Bean
public IntegrationFlow writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.get();
}
在它抛出错误后,我尝试订阅,但我不明白应该将什么传递给 subscribe 方法,这似乎与常规反应式 .subscribe() 的行为不同。
@Bean
public void writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.toReactivePublisher().subscribe(value -> {
log.info("Wrote: " + value);
});
}
【问题讨论】:
标签: java spring spring-boot spring-integration project-reactor