【问题标题】:Terminate threads in a thread pool for Multiple Tenant终止多租户线程池中的线程
【发布时间】:2023-02-20 18:40:01
【问题描述】:

我正在为作业创建一个线程池,如下所示。

public class MoveToCherwellThreadPool {

public static ThreadPoolExecutor cherwellMoveThreadPoolExecutor = null;
private static EMLogger logger = EMLogger.getLogger();
private static final String CLASSNAME = "MoveToCherwellThreadPool";

    public static void initiateCherwellMoveThreadPool() {
        BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(100000);
        cherwellMoveThreadPoolExecutor = new ThreadPoolExecutor(10,20, 20, TimeUnit.SECONDS, q);
        cherwellMoveThreadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r,
                    ThreadPoolExecutor executor) {
                logger.logDebug(CLASSNAME,"Rejected task cherwellMoveThreadPoolExecutor Active tasks : " + cherwellMoveThreadPoolExecutor.getActiveCount() + ", " + "cherwellMoveThreadPoolExecutor Completed tasks : " + cherwellMoveThreadPoolExecutor.getCompletedTaskCount()+" Waiting for a second !! ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executor.execute(r);
            }
        });
    }
    
}

我在为多个客户运行的过程中使用它。对于每个客户,新的线程池将被初始化并且线程将运行。 下面是我使用线程池的代码。

for (Object[] objects : relationshipList) {
                        CherwellRelationshipMoveThread relationshipThread = new CherwellRelationshipMoveThread(objects,
                                this.customerId, sb, credential,mainCIId,moveUniqueId,this.startTime);
                        CompletableFuture<?> future = CompletableFuture.runAsync(relationshipThread,
                                MoveToCherwellThreadPool.cherwellMoveThreadPoolExecutor);
                        crelationshipList.add(future);
                }
                crelationshipList.forEach(CompletableFuture::join);

将为多个客户创建此线程。我提供了在 UI 中终止此作业的选项。单击停止进程后,我只需要停止/终止为该特定客户运行的线程,而其他客户的线程不应受到损害,应继续运行。

单击 UI 中的停止进程时,我正在调用一个服务,我的代码将在服务内部

MoveToCherwellThreadPool.cherwellMoveThreadPoolExecutor.shutdownNow();

我在 ThreadPoolExecutor 上调用 shutdownNow()。

这正在杀死所有客户的所有线程。我不想终止所有客户进程,而只想终止我将单击停止进程的客户。

【问题讨论】:

    标签: java multithreading java-8 thread-safety threadpool


    【解决方案1】:

    此代码不维护从租户到线程池的任何映射,只有一个对 ThreadPoolExecutor 的静态引用。每次initiateCherwellMoveThreadPool被调用时,任何现有的executor都会被替换为一个新的executor,而现有的executor并没有关闭,所以它会泄漏资源。因此,这将在同一线程池中执行来自多个租户的任务。

    此代码也不是线程安全的。在调用 setRejectedExecutionHandler 之前,线程可能(如果不太可能)在新创建的执行程序上安排任务,甚至关闭它。

    如果您需要每个租户有一个单独的执行者,则需要实施。例如,一个好的选择可能是使用带有 customerId 键和 ThreadPoolExecutor 值的 ConcurrentHashMap(为简洁起见,省略了日志记录):

    public class MoveToCherwellThreadPool {
        public static ConcurrentMap<String, ThreadPoolExecutor> cherwellMoveThreadPoolExecutors = new ConcurrentHashMap<>();
    
        public static ThreadPoolExecutor getCherwellMoveThreadPool(String customerId) {
            return cherwellMoveThreadPoolExecutors.computeIfAbsent(customerId, id -> {
                BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(100000);
                ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, q);
                executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { /*...*/ });
                return executor;
            });
        }
    
        public static List<Runnable> stopCherwellMoveTheadPool(String customerId) {
            if (cherwellMoveThreadPoolExecutors.containsKey(customerId)) {
                return cherwellMoveThreadPoolExecutors.get(customerId).shutdownNow();
            }
            return Collections.emptyList();
        }
    
    }
    

    可以这样使用:

    CompletableFuture<?> future = CompletableFuture.runAsync(relationshipThread,
        MoveToCherwellThreadPool.getCherwellMoveThreadPool(customerId));
    

    同样重要的是要意识到调用shutdownNow只能试图取消当前正在执行的任务,并且“不等待主动执行的任务终止”:

    此实现通过 Thread.interrupt() 取消任务,因此任何未能响应中断的任务可能永远不会终止。

    未显示实现 CherwellRelationshipMoveThread 的代码,因此可能是也可能不是这种情况。

    【讨论】:

    • 你好蒂姆,如果你需要每个租户一个单独的执行者,这将需要实施。一个好的选择可能是使用带有 customerId 键和 ThreadPoolExecutor 值的 ConcurrentHashMap。你能举个例子吗?
    • 如果我为每个租户创建一个单独的线程池,我是否可以关闭该线程池,以便线程不会长时间进入等待/停放状态,直到它获得新任务。
    • 我也试图在我的 tomcat 服务器启动时初始化线程池,但由于我想为每个租户创建每个线程池,所以我想在触发我的计划任务时初始化线程池。
    • 任务是否在内部使用多线程?如果任务本身是单线程的,并且您希望线程在任务运行时启动并在任务完成时停止,那么您可能根本不需要线程池,而只需要一个普通线程。
    • 我有多个线程在内部运行。
    猜你喜欢
    • 2019-06-25
    • 2023-03-18
    • 1970-01-01
    • 2020-11-13
    • 1970-01-01
    • 1970-01-01
    • 2023-03-28
    • 2020-11-19
    • 2017-01-10
    相关资源
    最近更新 更多