【问题标题】:How to implement blocking thread pool executor?如何实现阻塞线程池执行器?
【发布时间】:2017-11-22 20:39:01
【问题描述】:

我们有一个大文本文件,其中每一行都需要大量的process。设计是有一个class,它读取文件并将每一行的处理委托给thread,通过thread pool。一旦池中没有空闲线程来进行处理,应该阻止文件读取器类读取下一行。所以我需要一个blocking thread pool

在当前的实现中,ThreadPoolExecutor.submit()ThreadPoolExecutor.execute() 方法在配置的线程数变忙后抛出 RejectedExecutionException 异常,如下面的代码 sn-p 所示。

public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}

上述程序的输出是

Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException

有人可以说明一下,即我如何实现阻塞线程池

【问题讨论】:

  • 我先误读了你的问题;并提出一个虚假的答案;我希望修改后的版本对你有用。
  • @GhostCat- 完成谢谢。

标签: java multithreading threadpoolexecutor


【解决方案1】:

有人能说明一下吗,即我如何实现阻塞线程池。

您需要在执行器服务上设置拒绝执行处理程序。当线程去将作业放入执行器时,它会阻塞,直到阻塞队列中有空间。

BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
     new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // block until there's room
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    }
});

因此,TRE 不会抛出 RejectedExecutionException,而是调用拒绝处理程序,该处理程序反过来会尝试将作业放回队列中。这会阻止调用者。

【讨论】:

  • 像这样直接插入工作队列是不安全的,因为它绕过了 ThreadPoolExecutor 用来管理其工作池状态的逻辑。例如,它没有正确检查池是否已关闭,这意味着对 execute() 的调用可能会挂起,而不是正确拒绝任务。
  • 好点@David。您知道实现阻塞池的更好方法吗?
  • Thread.currentThread().interrupt() 这里的意义/目标是什么?
  • 这是一个常见问题解答@yaseco。每当您编写捕获InterruptedException 的代码时,您应该立即重新中断线程。抛出异常会清除线程的中断标志,通常您希望确保调用者知道线程已被中断。请参阅:stackoverflow.com/a/3976377/179850dzone.com/articles/how-to-handle-the-interruptedexception
【解决方案2】:

让我们再看看你的代码:

for (int i = 1; i <= Jobs; i++)
  try {
    tpExe.submit(new WorkerThread(i));
    System.out.println("job added " + (i));
  } catch (RejectedExecutionException e) {
    System.err.println("RejectedExecutionException");
  }

所以 - 当您尝试提交并且池很忙时,就会引发该异常。如果你想绕过它,它可能看起来像:

public void yourSubmit(Runnable whatever) {
  boolean submitted = false;
  while (! submitted ) {
    try {
      tpExe.submit(new WorkerThread(whatever));
      submitted = true;
    } catch (RejectedExecutionException re) {
      // all threads busy ... so wait some time
      Thread.sleep(1000);
    }

换句话说:将该异常用作当前无法提交的“标记”。

【讨论】:

    【解决方案3】:

    这对我有用。

    class handler implements RejectedExecutionHandler{
        @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
    
                }
    

    【讨论】:

    • 不要这样回答。而是将其放入您的问题中。
    【解决方案4】:

    您可以使用信号量来控制资源。读取器将通过获取信号量来读取并创建异步任务。如果每个线程都忙,读取器线程将等待直到线程可用。

    public class MyExecutor {
        private final Executor exec;
        private final Semaphore semaphore;
    
        public BoundedExecutor(Executor exec, int bound) {
            this.exec = exec;
            this.semaphore = new Semaphore(bound);
        }
    
        public void submitTask(final Runnable command)
                throws InterruptedException, RejectedExecutionException {
            semaphore.acquire();
            try {
                exec.execute(new Runnable() {
                    public void run() {
                        try {
                            command.run();
                        } finally {
                            semaphore.release();
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                semaphore.release();
                throw e;
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2011-05-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-09-17
      相关资源
      最近更新 更多