【问题标题】:Spring Integration - How to debug 'Dispatcher has no Subscribers'?Spring Integration - 如何调试“调度程序没有订阅者”?
【发布时间】:2016-12-31 23:48:16
【问题描述】:

我正在使用 Spring Boot 1.4.0.RELEASE、Spring Integration 4.3.1.RELEASE、Spring Integration DSL 1.2.0.M1。

我正在尝试做的事情:

我正在编写一个应用程序,它将从 FTP 和本地文件系统(使用入站通道适配器)读取文件,将文件传输到本地工作目录(使用文件出站网关),处理,然后将它们移动到最终目的地(文件出站网关/适配器)。

我遇到了“调度程序没有频道订阅者”错误的问题。我相信这可能意味着上下文中的某些内容被破坏并且集成组件没有启动。上下文本身说它在我调试时处于活动状态。

我的实际配置相当大,所以我不是在找人为我找到解决方案。我正在寻找一些关于在哪里查找以及如何找出正在抱怨的组件的指导。

实际错误如下。

DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.

fileReadingFlow 是一个InboundChannelAdapter,它从目录中读取文件(基本上,我问过here 的内容。其中没有什么复杂的。适配器将消息发送到.log() 处理程序,丰富了标头,将其发送到处理程序 (Files.outboundgateway),最后发送到 MessageChannel

我尝试过的:

  • 我浏览了MessageChannels 链,一切都排好了(没有拼写错误,所有Bean 都存在)。
  • 我在fileReadingFlow 中添加了更多LoggingHandlers,以识别消息错误的位置。
  • 我已经删除了 fileReadingFlow 的部分内容,看看是否可以将消息传递得更远。
  • 我删除了一些Components,看看是否能找到问题。
  • 我为org.springframework.integration 添加了调试日志记录,但没有出现任何错误或警告。

我发现,当流第一次尝试做日志以外的事情(甚至是enrichHeaders)时,发生了Dispatcher错误,消息最终出现在errorChannel中。当我将fileReadingFlow 更改为仅读取文件、记录消息并以空处理程序终止时,我收到了 Dispatcher 错误。因此,我相当确定问题不在于 fileReadingFlow 本身。

除了逐个删除Component之外,有没有办法找出导致错误的原因?

编辑:

来源:

@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {

    return IntegrationFlows.from(s -> s.file(new File("someFolder")))
            .filter(fileListFilterBuilder.buildFileListFilter(File.class))
            , endpointConfigurer -> endpointConfigurer.poller(poller)
    )
            .log(DEBUG, "com.myco.testFileReadingFlow")
            .enrichHeaders(h ->
                    h.header("entity", entity)
                            .header(FOLDER_NAME, entity.getFolder())
            )
            .log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
            .handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
                .log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
                .channel("aggregatingFileChannel")
                .get();
    }
@Bean
public MessageChannel aggregatingFileChannel() {
    return MessageChannels.executor(Executors.newCachedThreadPool()).get();

}

@Bean
public IntegrationFlow aggregatingFlow() {

    // Read from the aggregatingFileChannel
    return from("aggregatingFileChannel")
            <...>
            .get();
}

应用:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
        basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

        MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);

        List<MyEntity> entities = entitySvc.findAllActive();

        AutowireCapableBeanFactory beanFactory = context.getBeanFactory();

        entities.forEach(entity -> {
            IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);

            beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
    }

解决方案:

根据我下面的 cmets,@Prototype 方法在某些时候确实有效,但我破坏了它并且无法轻松回滚更改。根据 Gary 和 Artem 的建议,我尝试更改为使用 IntegrationFlowContext 方法。为了保留我原来的运行时启动、配置文件驱动注入等,我将IntegrationFlow 的定义从@Configuration 类移动到@Service 类。这样我可以将IntegrationFlowContext 注入Service,并为我的不同配置文件实现不同版本的Service,而无需我的Application 知道Profile。主要方法从从Context 中提取Bean 并手动启动它到检索Service 并调用方法。

@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
    public IntegrationFlow testFileReadingFlow(Vendor vendor) {
        return // As previous Flow
    }

    public void startFileReadingFlow(MyEntity entity) {

        IntegrationFlow flow = testFileReadingFlow(entity);

        integrationFlowContext.register(flow, true);
    }
}

应用:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
        basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

        MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
        FlowSvc flowSvc = context.getBean(FlowSvc.class);
        List<MyEntity> entities = entitySvc.findAllActive();

        entities.forEach(entity -> {
            flowSvc.startFileReadingFlow(entity);
    }

【问题讨论】:

    标签: java spring debugging spring-integration


    【解决方案1】:

    当有一些SubscribableChannel 没有活动的Subscriber(在Spring 集成方面为MessageHandler)时,我们会出现Dispatcher has no Subscribers 错误。 “活动”是指根本没有定义或处于停止状态。

    因此,对于您对 application:test.fileReadingFlow.channel#1 MessageChannel 的问题,我会再查看一次 fileReadingFlow IntegrationFlow 并找出第二个匿名 DirectChannel 上的内容。

    我想不出调试此类问题的简单方法,因为有一个MessageChannel,但没有什么可跟踪的,因为Dispatcher has no Subscribers

    所以,请显示fileReadingFlow IntegrationFlow 的定义,让我们一起解决问题!

    看看你的相关问题应该是这个:

    .handle(Files.outboundGateway())
    

    可能只是处于停止状态。

    但在实际代码之前我们无法确定。

    【讨论】:

    • 启用调试日志并查找类似这样的消息...14:20:51.967 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 014:20:51.968 [main] INFO o.s.i.channel.DirectChannel - Channel 'application.fromKafka.channel#0' has 1 subscriber(s).
    • @Artem 我添加了示例流程。 @Gary,我看到了这样的消息。我的fileReadingFlow 没有。我认为那是因为它是一个原型范围,并且是手动启动的(包含在编辑中)。
    • 我认为你有@Scope("prototype")的问题。而且您的第二个订阅者还没有开始。由于您使用 Java DSL 1.2,您可以将 IntegrationFlow 转换为 Lifecycle 并启动所有依赖的 bean。在@Scope("prototype") 的情况下,仅启动一个FileReadingMessageSource 是不够的
    • 无论如何,加里所说的都是真的。即使您的IntegrationFlowprototype,所有其他家属也不是。考虑新的IntegrationFlowContext 技术:spring.io/blog/2016/07/08/…
    • @Gary @Artem 感谢您的帮助。 IntegrationFlowContext 似乎正在工作。我用基于此的解决方案更新了我的问题,并接受了上述答案。
    猜你喜欢
    • 2013-08-16
    • 2017-04-10
    • 2018-02-19
    • 2020-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-12
    • 2015-11-02
    相关资源
    最近更新 更多