【问题标题】:How to code run method in Thread pooling如何在线程池中编写运行方法
【发布时间】:2013-01-22 13:16:24
【问题描述】:

阅读线程池,我感到非常困惑。我了解了这个概念,它们实际上是如何工作的。 但我在这部分感到困惑,如何编码。

我在网上搜索了很多。最后我得到了一个博客,里面有代码,如下所示,

条件是,不要使用内置类

代码 1

public class ThreadPool {

  private BlockingQueue taskQueue = null;
  private List<PoolThread> threads = new ArrayList<PoolThread>();
  private boolean isStopped = false;

  public ThreadPool(int noOfThreads, int maxNoOfTasks){
    taskQueue = new BlockingQueue(maxNoOfTasks);

    for(int i=0; i<noOfThreads; i++){
      threads.add(new PoolThread(taskQueue));
    }
    for(PoolThread thread : threads){
      thread.start();
    }
  }

  public void synchronized execute(Runnable task){
    if(this.isStopped) throw
      new IllegalStateException("ThreadPool is stopped");

    this.taskQueue.enqueue(task);
  }

  public synchronized void stop(){
    this.isStopped = true;
    for(PoolThread thread : threads){
      thread.stop();
    }
  }

}

代码 2

public class PoolThread extends Thread {
  private BlockingQueue taskQueue = null;
  private boolean       isStopped = false;
  public PoolThread(BlockingQueue queue){
    taskQueue = queue;
  }
  public void run(){
    while(!isStopped()){
      try{
        Runnable runnable = (Runnable) taskQueue.dequeue();
        runnable.run();
      } catch(Exception e){
        //log or otherwise report exception,
        //but keep pool thread alive.
      }
    }
  }
  public synchronized void stop(){
    isStopped = true;
    this.interrupt(); //break pool thread out of dequeue() call.
  }
  public synchronized void isStopped(){
    return isStopped;
  }
}

代码 3:-

public class BlockingQueue {

  private List queue = new LinkedList();
  private int  limit = 10;

  public BlockingQueue(int limit){
    this.limit = limit;
  }

  public synchronized void enqueue(Object item)
  throws InterruptedException  {
    while(this.queue.size() == this.limit) {
      wait();
    }
    if(this.queue.size() == 0) {
      notifyAll();
    }
    this.queue.add(item);
  }

  public synchronized Object dequeue()
  throws InterruptedException{
    while(this.queue.size() == 0){
      wait();
    }
    if(this.queue.size() == this.limit){
      notifyAll();
    }

    return this.queue.remove(0);
  }    
}

我试图理解这段代码的作用。 但我不明白这段代码的流程。你能帮我理解这段代码吗?

Mainly I have problems in **Code 2 :- run method**

Why execute method's argument are of Runnable type?

How input array given to this code??

帮帮我。

提前致谢。

【问题讨论】:

  • 为什么一定要自己写线程池?最容易使用 Executors.*ThreadPool() 方法来创建线程池,然后在其上使用 submit(*) 方法来提交 Callable 或 Runnable。
  • @allprog:我编写了那个代码,这个代码很容易构建。但是我的老师要求在不使用内置类的情况下构建它。这就是我发布这个的原因。
  • 很好,你让社区检查作业。 :) 不幸的是,正确地执行 ExecutorService 很难。如果您为异步 Future 接口提供取消功能,那么您必须非常小心。您应该查看 JDK 源代码以获得正确的实现。它会比你现在拥有的复杂得多。
  • @allprog:不,我不是让社区来检查代码。我花了很多时间,最后我的小模块有问题,然后才问这个答案。我会照顾你的建议。谢谢:)
  • 放轻松。问总比不知道答案好。 :) 您也应该对 BlockingQueue 使用泛型,但在任​​何情况下都不要将 Object 类型用于入队和出队。它会破坏你的代码!这个功能在JDK中有一个同名的实现,你不能用吗?我认为 ExecutorService (正确实施)已经比简单的家庭作业或作业要复杂得多。

标签: java multithreading threadpool


【解决方案1】:
  public void run(){
    while(!isStopped()){

循环直到线程池停止。

      try{
        Runnable runnable = (Runnable) taskQueue.dequeue();

将头任务从任务队列中拉出。

        runnable.run();

运行任务。

      } catch(Exception e){
        //log or otherwise report exception,
        //but keep pool thread alive.

如果任务抛出异常,不要做任何特别的事情,只是不要传递它。

      }
    }
  }

【讨论】:

  • ` public PoolThread(BlockingQueue queue){ taskQueue = queue; }` 据我了解,这些行可以将任务队列的副本提供给所有线程工作者。我说的对吗?
  • 再告诉我一件事,我们如何将 inut 数组赋予 code 1 。因为要开始我们必须调用代码 1 的执行方法。
  • @FreakyCheeky:第一个问题:是的。每个线程都获得对同一队列的引用。第二个问题:你调用了execute函数,它把任务放到了共享队列中。
  • 告诉我一件事,为什么执行 ethod 参数是 Runnable 类型的?我不明白这个。
  • runnable 的文档中对此进行了解释。它正是为此目的而设计的。
【解决方案2】:

编辑:

我现在明白这是一个课堂项目,但我会将我的答案留给后人。

如果您尝试在 Java 下使用线程池,那么 java.util.concurrent.* 类已经为您实现了所有这些。其他答案解决并解释您的特定代码。

例如,您需要使用ExecutorService 代码设置线程池。在封面下面ExecutorService 处理线程并使用LinkedBlockingQueue。你定义了MyJob 类,它实现了'Runnable'并完成了池中线程运行的工作。根据您的需要,它可以是短期或长期运行的任务。

// create a thread pool with 10 workers
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// or you can create an open-ended thread pool
// ExecutorService threadPool = Executors.newCachedThreadPool();
// define your jobs somehow
for (MyJob job : jobsToDo) {
    threadPool.submit(job);
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...
public class MyJob implements Runnable {
    // you can construct your jobs and pass in context for them if necessary
    public MyJob(String someContext) {
        ...
    }
    public void run() {
        // process the job
    }
}

【讨论】:

  • 其实这段代码我是用Built-in class做的,但是老师说要自己做,不用Built-in class
猜你喜欢
  • 2014-01-03
  • 2015-09-08
  • 2011-05-14
  • 2018-01-01
  • 1970-01-01
  • 2013-04-05
  • 2013-03-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多