【问题标题】:Custom thread pool implementation in javajava中的自定义线程池实现
【发布时间】:2019-09-14 12:06:38
【问题描述】:

出于学习目的,我试图在 java 中实现我自己的线程池。以下是我已经实施的。我对这个实现有几个问题:

  1. 虽然我正在使用 BlockingQueue,比如内置的 java Executors 希望我们提供 Runnable 对象(通过 execute 方法)。但就我而言,我觉得我可以创建任何对象而不是 Runnable。那么为什么 Java 执行器期望 Runnable,我尝试查看源代码但仍然无法弄清楚。

  2. 这个原始实现还有什么问题吗?

请找到代码。

公共类 CustomThreadPool {

private final BlockingQueue<Runnable> blockingQueue;
private final Worker[] workers;

public CustomThreadPool(final int numOfThreads) {
    blockingQueue = new LinkedBlockingQueue<>();
    workers = new Worker[numOfThreads];

    for (int i = 0; i < numOfThreads; i++) {
        workers[i] = new Worker();
        workers[i].start();
    }
}

public void execute(final Runnable task) {
    blockingQueue.add(task);
}

public void shutdownImmediately() {
    for (int i = 0; i < workers.length; i++) {
        workers[i].shutdownSignal = true;
        workers[i] = null;
    }
}

 private class Worker extends Thread {
    private Runnable taskToPerform = null;
    boolean shutdownSignal = false;

    @Override
    public void run() {
        while(true && !shutdownSignal) {
            taskToPerform = blockingQueue.poll();
            if (taskToPerform != null) {
                taskToPerform.run();
            }
            if(shutdownSignal) {
                break;
            }
        }
    }
}

public static void main(String[] args) throws Exception {
    final CustomThreadPool threadPool = new CustomThreadPool(5);
    for (int i = 0; i < 20; i++) {
        threadPool.execute(() -> System.out.println(Thread.currentThread().getName()));
    }
    Thread.sleep(1*1000);
    threadPool.shutdownImmediately();
}

}

【问题讨论】:

    标签: java multithreading synchronization threadpool


    【解决方案1】:
    1. Executor 需要 Runnable 或 Callable,因为它在运行您提交的任务时会调用这些接口的 runcall 方法。

    2. 在您的实现中您不使用BlockingQueue 的阻塞方面。当您的队列中不存在任务时,您的线程池线程将在您的 while(true &amp;&amp; !shutdownSignal) 循环上不断旋转(占用 cpu 时间)。因为 poll() 方法没有阻塞。这不是您在实现线程池时想要的。

    您应该使用其中一种阻止方法而不是poll()

    您可以使用poll(long timeout,TimeUnit unit) 方法来获取超时参数。在这种情况下,如果您在任何线程池线程等待此调用时调用您的 shutdownImmediately 方法。他们将等待超时持续时间,并且 poll 将向他们返回 null 他们将看到 shutdownSignal 正在设置并退出循环。如果你不想让他们等待超时,你也可以调用中断方法。

     // in run method
     try {
         taskToPerform = blockingQueue.poll(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
         break; // this thread is interrupted. Time to leave this loop
     }
    
    // in shutdownImmediately method 
    workers[i].shutdownSignal = true;
    // If You don't call interrupt, Threads will wait at max 5 sec for this case.
    workers[i].interrupt(); 
    workers[i] = null;
    

    或者您可以使用take() 方法,该方法在队列中没有任何内容时阻塞。但是在这种情况下,当您调用 shutdownImmediately 方法时,您必须中断可能正在等待 take 方法调用的线程。否则他们会卡在 take() 调用中。

    // in run method
    try {
       taskToPerform = blockingQueue.take();
    } catch (InterruptedException e) {
       break; // this thread is interrupted. Time to leave this loop
    }
    
    // in shutdownImmediately method 
    workers[i].shutdownSignal = true;
    workers[i].interrupt(); // this is crucial for this case
    workers[i] = null;
    

    【讨论】:

      猜你喜欢
      • 2020-11-19
      • 2014-02-05
      • 2010-09-27
      • 2011-11-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-22
      • 2010-11-05
      相关资源
      最近更新 更多