【问题标题】:How to run spring integration with multithread如何使用多线程运行 Spring 集成
【发布时间】:2018-11-07 21:49:05
【问题描述】:

我想通过 spring 集成做以下事情

  1. 从 sftp 获取文件
  2. 将下载的文件发送到 http 和 s3

这是我目前所拥有的。

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setMaxFetchSize(2);
        return source;
    }

这是我的服务激活器。 我的服务激活器的问题在于它与轮询器在同一线程中运行,因此当文件处理时间过长时,它不会处理下一个,直到第一个完成。

@ServiceActivator(inputChannel = "sftpChannel")
        public void sftpChannel(@Payload File payload, @Header("timestamp") long timestamp) {
            log.info("Message arrived at sftpChannel");
         //do something with file
    }

如何在单独的线程上运行文件进程并释放轮询线程,以便轮询可以继续从 sftp 拉取文件?

【问题讨论】:

  • @Poller 采用 taskExecutor,您可以将其配置为异步运行。
  • 请形成您的评论作为答案。这是真的。
  • 感谢@tsolakp。在taskExecutor中,我是不是只执行任务?但是那我该如何配置线程呢? ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setMaxPoolSize(5); 例如:pool.execute(() -&gt; process(payload, timestamp), 1000); 你有样品吗?
  • 我没有例子,但@Artem Bilan 的这篇文章可能会有所帮助:stackoverflow.com/questions/36169903/…

标签: java spring spring-integration spring-integration-sftp


【解决方案1】:

您可以使用@Async 注释在单独的线程中运行任何方法。您只需在任何@Configuration 文件中添加@EnableAsync,当您调用它时,它将异步运行。您可以在此blog 中找到更多信息。

【讨论】:

    【解决方案2】:

    类似这样的:

    @Bean
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(5);
        return executor; 
    }
    

    并在@InboundChannelAdapter@Poller 中使用executor 作为bean 名称:

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", 
            poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3", taskExecutor="executor"))
    

    @PollerJavaDocs:

    /**
     * @return The {@link org.springframework.core.task.TaskExecutor} bean name.
     */
    String taskExecutor() default "";
    

    另请参阅参考手册中的文档:https://docs.spring.io/spring-integration/docs/5.0.9.RELEASE/reference/html/messaging-channels-section.html#conditional-pollers

    重要提示:异步切换

    此建议根据receive() 结果修改触发器。这仅在轮询线程上调用通知时才有效。如果轮询器有task-executor,它将不起作用。要在轮询结果后使用异步操作的地方使用此建议,请稍后执行异步切换,可能使用ExecutorChannel

    【讨论】:

    • 我试过你的例子,但轮询器似乎只在 1 个线程上运行:'(ThreadPoolTaskExecutor executor 一直在 task1 上运行
    • 轮询器 - 是的,因为你使用了一个固定的延迟,所以一个新的任务直到当前的完成加上这 100000 毫秒才会开始。另一方面,您应该并行处理 3 条消息
    • 也许我错过了对这个入站频道的了解。所以轮询器只是从 sftp 中提取文件,对吗?它当时确实拉了2个文件。那么在这种情况下,我应该为服务激活器创建另一个线程,以便在轮询器将它们拉到不同的线程后处理我的文件(发送到 http 和 S3)?
    • 我猜你需要考虑使用 PublishSubcribeChannel 和执行器来实现并行性,以及那些 http 和 S3 作为订阅者
    猜你喜欢
    • 1970-01-01
    • 2017-07-30
    • 1970-01-01
    • 2017-06-22
    • 1970-01-01
    • 1970-01-01
    • 2020-02-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多