【问题标题】:Spring IntegrationFlow CompositeFileListFilter Not WorkingSpring IntegrationFlow CompositeFileListFilter 不工作
【发布时间】:2020-01-27 16:33:20
【问题描述】:

我有两个过滤器 regexFilter 和 lastModified。

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .regexFilter(config.getRegexFilter())
            .filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {

    })))

通过谷歌搜索,我了解到我必须将 CompositeFileListFilter 用于正则表达式,因此将我的代码更改为

.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))

它已编译但在运行时会抛出错误,并且通道会弯曲,并且会出现同样的错误

.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.

public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
        CompositeFileListFilter filters = new CompositeFileListFilter();
            filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
        return filters;
    }

我只想根据文件名进行过滤。同一个远程文件夹有 2 个流程,并且都使用相同的 cron 进行轮询,但应该选择它们的相关文件。

编辑 添加最后一个 LastModifiedLsEntryFileListFilter。它工作正常,但应要求添加。

public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {

private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;

private volatile long age = DEFAULT_AGE;

private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();


public long getAge() {
    return this.age;
}

public void setAge(long age) {
    setAge(age, TimeUnit.SECONDS);
}

public void setAge(long age, TimeUnit unit) {
    this.age = unit.toSeconds(age);
}

@Override
public List<LsEntry> filterFiles(LsEntry[] files) {

    List<LsEntry> list = new ArrayList<LsEntry>();

    long now = System.currentTimeMillis() / 1000;

    for (LsEntry file : files) {

        if (file.getAttrs()
                .isDir()) {
            continue;
        }
        String fileName = file.getFilename();
        Long currentSize = file.getAttrs().getSize();
        Long oldSize = sizeMap.get(fileName);

        if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
            // putting size in map, will verify in next iteration of scheduler
            sizeMap.put(fileName, currentSize);
            log.info("[{}] old size [{}]  increased to [{}]...", file.getFilename(), oldSize, currentSize);
            continue;
        }

        int lastModifiedTime = file.getAttrs()
            .getMTime();

        if (lastModifiedTime + this.age <= now ) {
            list.add(file);
            sizeMap.remove(fileName);
        } else {
            log.info("File [{}] is still being uploaded...", file.getFilename());
        }
    }
    return list;
}

}

PS:当我测试正则表达式的过滤器时,为了简单起见,我删除了 LastModifiedLsEntryFileListFilter。所以我的最终流程是

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
            //.filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory()),
            e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                try {

                    this.destroy(String.valueOf(config.getId()));


    configurationService.removeConfigurationChannelById(config.getId());

//                // logging here
                } catch (Exception ex1) {
            }
            }))).publishSubscribeChannel(s -> s
            .subscribe(f -> {

                f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

            })
            .subscribe(f -> {
                if (doArchive) {
                    f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                } else {
                    f.handle(m -> {
                    });
                }

            })
            .subscribe(f -> f
            .handle(m -> {

                // I am handling exception here
            })
            ))
            .get();

这里有例外

2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel 

'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1

编辑 将正则表达式传递给 LastModifiedLsEntryFileListFilter 并在那里处理对我有用。当我在 CompositeFileListFilter 中使用任何其他 RegexFilter 时,它会引发错误。

.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))

【问题讨论】:

    标签: spring-integration spring-integration-dsl spring-integration-sftp


    【解决方案1】:

    请显示您的最终流程。我没有看到您在CompositeFileListFilter 中使用LastModifiedLsEntryFileListFilter...您绝对不能同时使用regexFilter()filter() - 最后一个获胜。为避免混淆,我们建议使用filter() 并使用CompositeFileListFilterChainFileListFilter 组合所有这些。

    请问您提到的错误是什么。

    【讨论】:

    • 我已经更新了问题。 LastModifiedLsEntryFileListFilter 已经正常工作了。
    • 现在可以用了吗?那你可以删除这个SO线程吗?我不确定我们可以讨论什么,因为您没有提供明确的问题。而且看起来也不例外……
    • 不,它不适合我。我同意也不例外,但频道在不传输文件的情况下关闭。
    • 能否请您修改您的问题以获取最具体的信息。现在还不清楚你的流程到底是什么,你有什么错误。顺便说一句,当您像这样测试轮询流程时,您必须确保在测试方法中以某种方式阻塞,因为流程是在另一个调度线程中执行的。因此,您可能会在消息到达您的频道之前更早地退出您的测试方法。不过,对于测试,您也可以考虑使用MockIntegrationdocs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/…
    • .filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))) 我在 LastModified 中传递我的正则表达式并在那里处理。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-17
    相关资源
    最近更新 更多