【发布时间】:2016-12-31 23:48:16
【问题描述】:
我正在使用 Spring Boot 1.4.0.RELEASE、Spring Integration 4.3.1.RELEASE、Spring Integration DSL 1.2.0.M1。
我正在尝试做的事情:
我正在编写一个应用程序,它将从 FTP 和本地文件系统(使用入站通道适配器)读取文件,将文件传输到本地工作目录(使用文件出站网关),处理,然后将它们移动到最终目的地(文件出站网关/适配器)。
我遇到了“调度程序没有频道订阅者”错误的问题。我相信这可能意味着上下文中的某些内容被破坏并且集成组件没有启动。上下文本身说它在我调试时处于活动状态。
我的实际配置相当大,所以我不是在找人为我找到解决方案。我正在寻找一些关于在哪里查找以及如何找出正在抱怨的组件的指导。
实际错误如下。
DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.
fileReadingFlow 是一个InboundChannelAdapter,它从目录中读取文件(基本上,我问过here 的内容。其中没有什么复杂的。适配器将消息发送到.log() 处理程序,丰富了标头,将其发送到处理程序 (Files.outboundgateway),最后发送到 MessageChannel。
我尝试过的:
- 我浏览了
MessageChannels 链,一切都排好了(没有拼写错误,所有Bean都存在)。 - 我在
fileReadingFlow中添加了更多LoggingHandlers,以识别消息错误的位置。 - 我已经删除了
fileReadingFlow的部分内容,看看是否可以将消息传递得更远。 - 我删除了一些
Components,看看是否能找到问题。 - 我为
org.springframework.integration添加了调试日志记录,但没有出现任何错误或警告。
我发现,当流第一次尝试做日志以外的事情(甚至是enrichHeaders)时,发生了Dispatcher错误,消息最终出现在errorChannel中。当我将fileReadingFlow 更改为仅读取文件、记录消息并以空处理程序终止时,我收到了 Dispatcher 错误。因此,我相当确定问题不在于 fileReadingFlow 本身。
除了逐个删除Component之外,有没有办法找出导致错误的原因?
编辑:
来源:
@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {
return IntegrationFlows.from(s -> s.file(new File("someFolder")))
.filter(fileListFilterBuilder.buildFileListFilter(File.class))
, endpointConfigurer -> endpointConfigurer.poller(poller)
)
.log(DEBUG, "com.myco.testFileReadingFlow")
.enrichHeaders(h ->
h.header("entity", entity)
.header(FOLDER_NAME, entity.getFolder())
)
.log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
.handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
.log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
.channel("aggregatingFileChannel")
.get();
}
@Bean
public MessageChannel aggregatingFileChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
@Bean
public IntegrationFlow aggregatingFlow() {
// Read from the aggregatingFileChannel
return from("aggregatingFileChannel")
<...>
.get();
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
AutowireCapableBeanFactory beanFactory = context.getBeanFactory();
entities.forEach(entity -> {
IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);
beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
}
解决方案:
根据我下面的 cmets,@Prototype 方法在某些时候确实有效,但我破坏了它并且无法轻松回滚更改。根据 Gary 和 Artem 的建议,我尝试更改为使用 IntegrationFlowContext 方法。为了保留我原来的运行时启动、配置文件驱动注入等,我将IntegrationFlow 的定义从@Configuration 类移动到@Service 类。这样我可以将IntegrationFlowContext 注入Service,并为我的不同配置文件实现不同版本的Service,而无需我的Application 知道Profile。主要方法从从Context 中提取Bean 并手动启动它到检索Service 并调用方法。
@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
public IntegrationFlow testFileReadingFlow(Vendor vendor) {
return // As previous Flow
}
public void startFileReadingFlow(MyEntity entity) {
IntegrationFlow flow = testFileReadingFlow(entity);
integrationFlowContext.register(flow, true);
}
}
应用:
@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);
MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
FlowSvc flowSvc = context.getBean(FlowSvc.class);
List<MyEntity> entities = entitySvc.findAllActive();
entities.forEach(entity -> {
flowSvc.startFileReadingFlow(entity);
}
【问题讨论】:
标签: java spring debugging spring-integration