【问题标题】:Download multiple files using SourcePollingChannelAdapter- start/stop issue?使用 SourcePollingChannelAdapter 下载多个文件 - 启动/停止问题?
【发布时间】:2016-06-09 22:13:36
【问题描述】:

我的要求是从我的 spring-batch 应用程序的文件服务器中的远程目录下载多个文件。 here 的示例之一建议使用 SourcePollingChannelAdapter。在此示例中,适配器已启动 adapter.start() 但并未停止。那么它的生命周期是如何工作的呢?我的要求是首先下载并且只有当所有文件都下载后,然后继续阅读文件。但这似乎是下载文件的一种异步过程。那么,当所有文件都已下载并准备好继续进行时,我将如何获得通知?我没有看到任何其他方法可以向适配器/可轮询通道注入任何处理程序。

使用的示例配置:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapterBean"
        auto-startup="false" channel="receiveChannelBean" session-factory="sftpSessionFactory"
        local-directory="${sftp.download.localDirResource}"
        remote-directory="${sftp.download.remoteFilePath}"
        auto-create-local-directory="true" delete-remote-files="false"
        filename-regex=".*\.csv$">
        <int:poller fixed-rate="5000" max-messages-per-poll="3"  />
    </int-sftp:inbound-channel-adapter>

    <int:channel id="receiveChannelBean">
        <int:queue/>
    </int:channel>

如果显式调用了adapter.stop(),它会抛出InterruptedException。如果不调用 stop ,则以异步方式下载文件而不会阻塞,因此下一步不知道下载是否已完成或正在进行中。

编辑:DownloadingTasklet 的代码片段

 @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        try {

            sftpInbondAdapter.start();
            Thread.sleep(15000); // ---Is this mandatory ?
        } catch (Exception e) {
            throw new BatchClientException("Batch File Download was un-successful !", e);
        } finally {
           // sftpInbondAdapter.stop();
            Logger.logDebug("BATCH_INPUT_FILE_DOWNLOAD", "COMPLETED");
        }
        return RepeatStatus.FINISHED;
    }

【问题讨论】:

    标签: spring-integration spring-batch spring-integration-sftp


    【解决方案1】:

    SftpInboundFileSynchronizingMessageSource 分两个阶段工作:

    • 首先它会从远程目录同步到本地目录:

      Message<File> message = this.fileSource.receive();
      if (message == null) {
          this.synchronizer.synchronizeToLocalDirectory(this.localDirectory);
          message = this.fileSource.receive();
      }
      
    • 正如您看到的那样,它开始将文件作为消息发出。但已经只有本地文件。至此,从 FTP 下载已经完成。

    • 只有在发送过程中清除所有本地文件时才会进行下一次下载尝试。

    所以,您需要的是内置功能。

    编辑

    我们测试中的一些 DEBUG 日志:

    19:29:32,005 DEBUG task-scheduler-1 util.SimplePool:190 - Obtained new org.springframework.integration.sftp.session.SftpSession@26f16625.
    19:29:32,036 DEBUG task-scheduler-1 inbound.SftpInboundFileSynchronizer:287 - cannot copy, not a file: /sftpSource/subSftpSource
    19:29:32,036 DEBUG task-scheduler-1 session.CachingSessionFactory:187 - Releasing Session org.springframework.integration.sftp.session.SftpSession@26f16625 back to the pool.
    19:29:32,036 DEBUG task-scheduler-1 util.SimplePool:221 - Releasing org.springframework.integration.sftp.session.SftpSession@26f16625 back to the pool
    19:29:32,037 DEBUG task-scheduler-1 inbound.SftpInboundFileSynchronizer:270 - 3 files transferred
    19:29:32,037 DEBUG task-scheduler-1 file.FileReadingMessageSource:380 - Added to queue: [local-test-dir\rollback\ sftpSource1.txt, local-test-dir\rollback\sftpSource2.txt]
    19:29:32,043  INFO task-scheduler-1 file.FileReadingMessageSource:368 - Created message: [GenericMessage [payload=local-test-dir\rollback\ sftpSource1.txt, headers={id=40f75bd1-2150-72a4-76f0-f5c619a246da, timestamp=1465514972043}]]
    

    配置如下:

    <int-sftp:inbound-channel-adapter
            session-factory="sftpSessionFactory"
            channel="requestChannel"
            remote-directory="/sftpSource"
            local-directory="local-test-dir/rollback"
            auto-create-local-directory="true"
            local-filter="acceptOnceFilter">
        <int:poller fixed-delay="1" max-messages-per-poll="0"/>
    </int-sftp:inbound-channel-adapter>
    

    注意它如何显示3 files transferred,其中之一是目录,因此跳过。

    怎么说Added to queue:

    只有在那之后它才开始将它们作为消息发出。

    一切都只是因为我有fixed-delay="1"。哪怕只有1毫秒。

    max-messages-per-poll="0" 表示使用一个轮询任务处理所有内容。

    【讨论】:

    • 你能分享更多信息吗?您是说使用此适配器无法满足要求,还是我应该增加任何超时以允许更多时间进行 localDir 同步?
    • 我向您展示了只是阻塞的现有代码。虽然我可以猜到你在那里使用fixed-rate,它开始一个新任务而不是等待前一个任务的完成。因此,将您的&lt;poller&gt; 更改为使用fixed-delay,该进程将是单线程的。因此,在完全下载之前不会发出任何文件。
    • 对,样本使用fixed-rate:github.com/spring-projects/spring-integration-samples/blob/…。这真的可能不足以等待从 FTP 下载的全部内容。
    • 仍然固定延迟没有帮助。我应该明确调用 receive() 吗?如果有多少次,因为我们不知道存在的文件数
    • 你必须在每次测试前清理本地目录:-)
    猜你喜欢
    • 1970-01-01
    • 2013-07-05
    • 2011-07-02
    • 1970-01-01
    • 1970-01-01
    • 2021-10-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多