【问题标题】:New threads are getting created on polling a directory using spring integration使用 spring 集成轮询目录时正在创建新线程
【发布时间】:2024-05-21 22:05:01
【问题描述】:

这是我们的 spring 配置:

    <int-file:inbound-channel-adapter id="fileReprocessorChannelId" channel="fileReprocessorChannel"
    directory="${file.location}" scanner="headScanner">
    <int:poller cron="${reprocess.cronExpression}" max-messages-per-poll="${reprocess.maxMsgPerPoll}" />
</int-file:inbound-channel-adapter>

<int:chain id="reprocessorChain" input-channel="fileReprocessorChannel" output-channel="transformerChannel">
    <int-file:file-to-string-transformer delete-files="false" charset="UTF-8" />
    <int:header-enricher>
        <int:header name="Operation" value="${operation.fileReprocessor}" overwrite="true" />
        <int:header name="GUID" method="getGuidForReprocessing" ref="headerAttributesGenerator"/>
    </int:header-enricher>

</int:chain>

<bean id="headScanner" class="FileStreamDirectoryScanner">
    <constructor-arg>
        <value>${reprocess.maxMsgPerPoll}</value>
    </constructor-arg>
    <constructor-arg>
        <value>${reprocess.fileAgeInMillis}</value>
    </constructor-arg>
    <property name="locker" ref="nio-locker" />
</bean>

<bean id="nio-locker" class="org.springframework.integration.file.locking.NioFileLocker" />

<int:channel id="transformerChannel">
    <int:interceptors> 
    <int:wire-tap channel="loggerChannel"/>
    </int:interceptors>
</int:channel>  

在磁盘上有大约 10000 个文件的服务器上运行时,当处理大约 7000 个文件时,我们发现以下异常:java.nio.file.FileSystemException: Too many open files.

在调试代码时,线程似乎是在这里创建的:https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java#L334

大量线程正在消耗大约 70 个线程的大型 CPU,导致应用程序崩溃。

如果有更好的方法可以做到这一点(我们做错了什么?)或者这是 Spring 代码中的一个已知错误,您能否提出建议?

编辑:

  1. 附加的线程转储: Thread dump

【问题讨论】:

    标签: java spring multithreading spring-integration


    【解决方案1】:

    如果文件位于几个目录中,我会推荐 WatchService。这也可能会启动一个线程,但对于目录,而不是目录中的每个文件。

    【讨论】:

      【解决方案2】:

      默认的taskExecutorSyncTaskExecutor,因此任务在调度程序线程上运行。

      默认的taskScheduler bean 只有 10 个线程,所以你必须有一些你没有显示的其他配置。

      您是否查看过线程转储 (jstack pid) 以了解所有这些线程在做什么?

      【讨论】:

      • 线程似乎试图锁定文件:java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - 在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks 停车等待 (一个 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)。 AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      • 我已经用整个配置更新了原始问题。它是 70 个线程。我误解了 lsof linux 命令的用法。我在日志中得到这个异常:java.nio.file.FileSystemException: Too many open files.
      • 您的线程转储与 Spring 集成完全无关。这说明了 Spring AMQP。您使用哪个版本?要不要升级到最新的:projects.spring.io/spring-amqp
      • 我可以在没有 spring amqp 的情况下重现这个。这相关吗?:java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - 停车等待 (a java.util.concurrent.locks .AbstractQueuedSynchronizer$Condit‌​ionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java‌​:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit‌​ionObject.await(Abst‌​ractQueuedSynchroniz‌​er. java:2039)