【问题标题】:Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Project Reactor无法将回调 lambdas 传递到 Spring 与 Project Reactor 集成中的 .toReactivePublisher().subscribe()
【发布时间】:2022-01-23 20:51:17
【问题描述】:

当我将 Spring Integration 与 Project Reactor 一起使用时,我收到了 Caused by: java.lang.IllegalStateException: No subscriptions have been created 的错误,我试图弄清楚如何订阅。我原来的代码是:

    @Bean
    public IntegrationFlow writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .get();
    }

在它抛出错误后,我尝试订阅,但我不明白应该将什么传递给 subscribe 方法,这似乎与常规反应式 .subscribe() 的行为不同。

    @Bean
    public void writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .toReactivePublisher().subscribe(value -> {
                log.info("Wrote: " + value);
            });
    }

【问题讨论】:

    标签: java spring spring-boot spring-integration project-reactor


    【解决方案1】:

    这样做.toReactivePublisher().subscribe() 组合不正确。 IntegrationFlow 必须首先公开并配置为 bean。只有这样,在您的服务中某处注入此 bean 后,您才能将 subscribe() 注入到该 Publisher bean。

    您错过了这样一个事实,即必须首先在其依赖注入容器中初始化控制反转,然后才能对这些 bean 进行一些实际工作(订阅)。

    编辑

    例如我的测试用例:

    @SpringJUnitConfig
    @DirtiesContext
    public class ReactiveStreamsTests {
    
        @Autowired
        @Qualifier("pollableReactiveFlow")
        private Publisher<Message<Integer>> pollablePublisher;
    
        @Autowired
        private AbstractEndpoint reactiveTransformer;
    
        @Autowired
        @Qualifier("inputChannel")
        private MessageChannel inputChannel;
    
        @Test
        void testPollableReactiveFlow() throws Exception {
            assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
            this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
    
            CountDownLatch latch = new CountDownLatch(6);
    
            Flux.from(this.pollablePublisher)
                    .take(6)
                    .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                    .doOnNext(p -> latch.countDown())
                    .subscribe();
    
            ExecutorService exec = Executors.newSingleThreadExecutor();
            Future<List<Integer>> future =
                    exec.submit(() ->
                            Flux.just("11,12,13")
                                    .map(v -> v.split(","))
                                    .flatMapIterable(Arrays::asList)
                                    .map(Integer::parseInt)
                                    .<Message<Integer>>map(GenericMessage::new)
                                    .concatWith(this.pollablePublisher)
                                    .take(7)
                                    .map(Message::getPayload)
                                    .collectList()
                                    .block(Duration.ofSeconds(10))
                    );
    
            this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
    
            assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
            List<Integer> integers = future.get(20, TimeUnit.SECONDS);
    
            assertThat(integers).isNotNull();
            assertThat(integers.size()).isEqualTo(7);
            exec.shutdownNow();
        }
    
        @Configuration
        @EnableIntegration
        public static class ContextConfiguration {
    
            @Bean
            public Publisher<Message<Integer>> pollableReactiveFlow() {
                return IntegrationFlows
                        .from("inputChannel")
                        .split(s -> s.delimiters(","))
                        .<String, Integer>transform(Integer::parseInt,
                                e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
                        .channel(MessageChannels.queue())
                        .log()
                        .toReactivePublisher();
            }
    
        }
    
    }
    

    【讨论】:

    • 你能举个简单的例子吗?
    • 在我的回答中查看编辑。
    • 谢谢。它帮助很大,但我的需求完全不同,因为我确实尝试使用IntegrationFlow。我提出了一个新问题,并提供了更准确的示例来满足我的需要。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-30
    • 2018-06-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多