【问题标题】:No subscriptions have been created in Reactor Kafka and Spring Integration在 Reactor Kafka 和 Spring 集成中没有创建订阅
【发布时间】:2022-01-24 15:25:56
【问题描述】:

我正在尝试使用 Spring Integration 和 Project Reactor 创建一个简单的流程,我使用 Reactor Kafka 使用记录,将它们传递到一个通道,从那里它将使用 Reactor Kafka 生成消息到另一个主题。

消费流是:

@Service
public class ReactiveConsumerService {
    public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;

    @Qualifier("directChannel")
    @Autowired
    public MessageChannel directChannel;

    public ReactiveConsumerService(ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate) {
        this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
    }

    @Bean
    public IntegrationFlow readFromKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(GenericMessage::new))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .<String, String>transform(String::toUpperCase)
            .channel(directChannel)
            .get();
    }
}

而生产流程是:

@Service
public class ReactiveProducerService {
    private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;

    @Qualifier("directChannel")
    @Autowired
    public MessageChannel directChannel;

    public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
        this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
    }

    @Bean
    public IntegrationFlow kafkaProducerFlow() {
        return IntegrationFlows.from(directChannel)
            .handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()))
            .get();
    }
}

我想知道我应该如何以及在哪里执行订阅。

编辑:

我添加了.subscripe(),但它仍然不起作用:

2022-01-25 20:36:59.570  INFO 1804 --- [ration-sample-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for consumer-reactive-kafka-spring-integration-sample-1 unregistered
2022-01-25 20:36:59.573 ERROR 1804 --- [oundedElastic-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: No subscriptions have been created
Caused by: java.lang.IllegalStateException: No subscriptions have been created
    at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
    reactor.core.publisher.Flux.doOnRequest
Caused by: java.lang.IllegalStateException: No subscriptions have been created

    reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
Error has been observed at the following site(s):
    *________Flux.doOnRequest ? at reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
    |_            Flux.filter ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck$6(DefaultKafkaReceiver.java:70)
    |_         Flux.publishOn ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck$6(DefaultKafkaReceiver.java:71)
    |_               Flux.map ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck$6(DefaultKafkaReceiver.java:72)
    *______________Flux.using ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$19(DefaultKafkaReceiver.java:137)
    *__________Flux.usingWhen ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.withHandler(DefaultKafkaReceiver.java:129)
    |_                        ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.receiveAutoAck(DefaultKafkaReceiver.java:68)
    |_                        ? at reactor.kafka.receiver.KafkaReceiver.receiveAutoAck(KafkaReceiver.java:124)
    |_         Flux.concatMap ? at org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate.receiveAutoAck(ReactiveKafkaConsumerTemplate.java:69)
    |_               Flux.map ? at reactor.kafka.spring.integration.samples.service.ReactiveConsumerService.readFromKafka(ReactiveConsumerService.java:38)
    |_              Flux.from ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:118)
    |_ Flux.delaySubscription ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:119)
    |_         Flux.publishOn ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:120)
    |_          Flux.doOnNext ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:121)
Original Stack Trace:
        at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
        at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:207) ~[reactor-kafka-1.3.9.jar:1.3.9]
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.14.jar:3.4.14]
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.14.jar:3.4.14]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2022-01-25 20:36:59.772  INFO 1804 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8090
2022-01-25 20:36:59.853  INFO 1804 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Routes startup summary (total:0 started:0)
2022-01-25 20:36:59.853  INFO 1804 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.12.0 (camel-1) started in 149ms (build:84ms init:59ms start:6ms)
2022-01-25 20:36:59.866  INFO 1804 --- [           main] ReactorKafkaSpringIntegrationApplication : Started ReactorKafkaSpringIntegrationApplication in 4.246 seconds (JVM running for 4.616)

示例代码:

@Service
public class ReactiveProducerService {
    private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;

    @Qualifier("directChannel")
    @Autowired
    public MessageChannel directChannel;

    public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
        this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
    }

    @Bean
    public IntegrationFlow kafkaProducerFlow() {
        return IntegrationFlows.from(directChannel)
            .handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()).subscribe(System.out::println))
            .get();
    }
}

【问题讨论】:

    标签: spring spring-boot spring-integration project-reactor reactor-kafka


    【解决方案1】:

    .&lt;ConsumerRecord&lt;String, String&gt;, String&gt;transform(ConsumerRecord::value) 的端点由应用程序上下文自动启动时,会立即订阅reactiveKafkaConsumerTemplate

    将此视为替代方案:

    /**
     * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
     * @param autoStartOnSubscribe start message production and consumption in the flow,
     * when a subscription to the publisher is initiated.
     * If this set to true, the flow is marked to not start automatically by the application context.
     * @param <T> the expected {@code payload} type
     * @return the Reactive Streams {@link Publisher}
     * @since 5.5.6
     */
    @SuppressWarnings(UNCHECKED)
    protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
    

    虽然我认为您的意思是出站方面的订阅。您的问题尚不清楚,但reactiveKafkaProducerTemplate 的合同如下:

    public Mono<SenderResult<Void>> send(String topic, V value) {
    

    因此,您需要订阅返回的Mono 以启动流程。

    注意:send() 的参数也很混乱。你不是这个意思吗:reactiveKafkaProducerTemplate.send("test", "topic2")

    要让它订阅那个Mono,你只需要在那个handle()中自己做:

    .handle(s -> reactiveKafkaProducerTemplate.send("topic2", "test").subscribe())
    

    更新 2

    reactor.kafka.receiver.ReceiverOptions.subscriber() 中的java.lang.IllegalStateException: No subscriptions have been created 之类的错误表示您没有分配要收听的主题、模式或分区。

    请参阅ReceiverOptions.subscription()ReceiverOptions.assignment()

    【讨论】:

    • 我已经修复了示例,因此它将使用 lambda 参数。出于某种原因,即使我完全按照您在上一个示例中所做的订阅,我仍然会收到相同的错误。
    • 您没有在问题中说“错误”。或者您不共享所有信息,或将其与您的其他问题混合。请分享该错误,我们将尝试一起修复它。
    • 那是因为错误非常简单:(我刚刚得到java.lang.IllegalStateException: No subscriptions have been created)。我已经编辑了问题并带来了更大的堆栈跟踪
    • 在我的回答中查看更新 2。
    • 从您最初在问题中显示的代码来看,这并不是那么简单......
    猜你喜欢
    • 1970-01-01
    • 2019-08-10
    • 1970-01-01
    • 2020-04-01
    • 2019-10-04
    • 2019-06-08
    • 1970-01-01
    • 2013-08-16
    • 2018-09-17
    相关资源
    最近更新 更多