【问题标题】:ExecutorService that executes tasks sequentially but takes threads from a poolExecutorService 按顺序执行任务但从池中获取线程
【发布时间】:2017-02-16 04:23:40
【问题描述】:

我正在尝试构建ExecutorService 的实现,我们称之为SequentialPooledExecutor,具有以下属性。

  1. SequentialPooledExecutor 的所有实例共享同一个线程池

  2. SequentialPooledExecutor 的同一实例的调用按顺序执行。

换句话说,实例在开始处理其队列中的下一个任务之前等待当前正在执行的任务终止。

我目前正在自己​​实施SequentialPooledExecutor,但我想知道我是否在重新发明轮子。我研究了ExecutorService 的不同实现,例如Executors 类提供的那些,但我没有找到满足我要求的实现。

你知道我是否缺少现有的实现,还是我应该继续自己实现接口?

编辑:

我觉得我的要求不是很清楚,看看能不能用其他语言来解释一下。

假设我有一系列会话,比如 1000 个(我之前称为执行器实例的东西)。我可以将任务提交到会话,并且我希望保证提交到同一会话的所有任务都按顺序执行。但是,属于不同会话的任务不应相互依赖。

我想定义一个 ExecutorService 来执行这些任务,但使用有限数量的线程,比如说 200,但确保在同一会话中的前一个任务完成之前不会启动任务。

我不知道是否有任何现有的东西已经这样做了,或者我是否应该自己实现这样的ExecutorService

【问题讨论】:

  • 为什么需要线程池?如果要按顺序调用它们,则只需要一个。我错过了什么?
  • @Erik 看下面的评论
  • 您肯定需要一些自定义实现。但是你的问题很有趣,你可以写另一个问题,包括你的设计细节和你遇到的任何问题。

标签: java multithreading threadpool executorservice threadpoolexecutor


【解决方案1】:

如果您想按顺序执行任务,只需创建一个带有只有一个线程ExecutorService,感谢Executors.newSingleThreadExecutor()

如果您有不同类型的任务,并且只想顺序执行相同类型的任务,您可以使用相同的单线程ExecutorService处理相同类型的任务,无需重新发明轮子。

假设你有1 000不同类型的任务,你可以使用200单线程ExecutorService,你唯一需要自己实现的是你总是需要使用相同的单线程针对给定类型的任务线程化ExecutorService

【讨论】:

  • newSingleThreadedExecutor 的问题是我无法绑定创建的线程数。如果我创建 1000 个执行器实例,我将在后台执行 1000 个线程。我想要的是拥有 1000 个执行程序实例,但只说 200 个线程执行任务。但是,我希望保证在同一执行程序实例上提交的任务是按顺序执行的。
  • 如果你想让它们顺序执行任务而不需要200个线程,那会浪费资源。仅使用多个线程来并行启动任务
  • 我不想按顺序执行所有个任务。只有属于同一个执行器实例的任务才应该顺序执行。如果两个任务属于不同的执行器实例,那么它们可以并行执行。
  • 那么你想要的是多个单线程执行器,每个都管理自己的线程?
  • 对不起,但对我来说,这完全是另一个问题,即使您决定实现 SequentialPooledExecutor,即使使用任意数量的线程,您也会遇到同样的问题,因为您想执行任务顺序,所以即使一个任务太长,你也需要等到它结束才能执行另一个任务。
【解决方案2】:

如果您有数千个必须按顺序处理的键,但您没有数千个内核,您可以使用散列策略来分配这样的工作

ExecutorService[] es = // many single threaded executors

public <T> Future<T> submit(String key, Callable<T> calls) {
    int h = Math.abs(key.hashCode() % es.length);
    return es[h].submit(calls);
}

一般来说,您只需要 2 * N 个线程来保持 N 个内核忙碌,如果您的任务受 CPU 限制,那么更多只会增加开销。

【讨论】:

    【解决方案3】:

    @Nicolas 的答案可能是您最好的选择,因为它简单、经过充分测试且高效。

    如果它不能满足您的要求,我会这样做:

    1. 不要将“SequentialPooledExecutor”作为执行器服务,将其作为单线程执行器服务“池”的外观
    2. 让您的“SequentialPooledExecutor”实现一个提交方法(采用 Runnable/Callable 和一个表示“队列名称”的字符串),返回一个 Future,就像一个执行器服务
    3. 在调用此方法时,通过获取队列名称的哈希并将其分派到相应的内部执行器,将您的“SequentialPooledExecutor”分派到其内部单线程执行器服务之一。

    在第 3 步发生的散列部分允许您将每个“队列名称”的任务始终转到“SequentialPooledExecutor”内的相同(单线程)执行程序服务。

    另一个可能的途径是使用CompletionStageCompletableFutures。实际上,这些是可听的期货(具有完成处理程序)。有了这些,你第一次有一个“会话”,你创建了一个CompletableFuture,你的第一个任务,并坚持下去。在每个新任务中,您将先前的未来与新任务结合起来,调用thenAcceptAsync(或任何类似的)。你得到的是一个执行任务的线性链。

    【讨论】:

    • 您的建议的问题是,如果我确定性地将会话绑定到执行程序,例如根据其哈希值,并且该会话变得无响应或非常慢,那么所有绑定到相同的会话执行者被阻止。
    • 没有解决方案:如果你不想要一个会话线程,那么会话最终将能够锁定其他会话。你有独立性,或者你没有......我猜。您可以在插入任务之前尝试某种“验证查询”(就像数据库连接池所做的那样),并在执行程序失败时删除它们,但是这样做,保持顺序执行要求,而不是删除任务将被证明是棘手的最少。
    • 我的想法是,如果池未满,会话会从池中获取一个新的执行程序,如果池满则等待。以这种方式,只有当池中的所有执行程序都很慢或没有响应时,会话才会被锁定。不过,这种情况不太可能发生。
    • 您在此级别建议的等待在概念上与您在第一条评论中不想要的等待没有什么不同。
    • 嗯,你不想要 any 池,因为你想要一个顺序执行......但我明白你的意思。我使用可完成的期货编辑了另一种潜在的解决方案。请注意,无论您做什么,如果您的任务可能变得缓慢或反应迟钝,那么您最终会陷入糟糕的境地。您应该考虑到这一点进行设计。无论您有 200 或 2000 个执行者,如果发生饱和堆积并填满。
    【解决方案4】:
    private Map<Integer, CompletableFuture<Void>> sessionTasks = new HashMap<>();
    private ExecutorService pool = Executors.newFixedThreadPool(200);
    
    public void submit(int sessionId, Runnable task) {  
        if (sessionTasks.containsKey(sessionId)) {
            sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool));
        } else {
            sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool));
        }
    }
    

    如果会话没有任务,则会创建一个新任务并在提供的池中运行。如果在添加新任务时会话已经有任务,则后者将(使用thenRun)链接到前一个任务,以确保顺序。

    【讨论】:

    • 如果会话从多个线程注册(这肯定是预期的用例......),非并发映射实现可能会导致不可预知的行为。
    【解决方案5】:

    如果要配置有界队列,请使用ThreadPoolExecutor

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
    TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
    

    对于您的用例,使用ThreadPoolExecutor

    ThreadPoolExecutor executor =    
    ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000));
    

    队列的代码上限大小为ThreadPoolExecutor为1000。如果要使用自定义拒绝执行处理程序,可以配置RejectedExeutionHandler

    相关的 SE 问题:

    How to properly use Java Executor?

    【讨论】:

      【解决方案6】:

      最近我遇到了同样的问题。 没有为此的内置类,但队列足够接近。 我的简单实现看起来像这样(也许它对其他人在同一问题上寻找示例有帮助)

      public class SerializedAsyncRunnerSimple implements Runnable {
      private final ExecutorService pool;
      protected final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); //thread safe queue
      protected final AtomicBoolean active = new AtomicBoolean(false);
      
      
      public SerializedAsyncRunnerSimple(ExecutorService threadPool) {this.pool = threadPool;}
      
      
      public void addWork(Runnable r){        
          queue.add(r);
          startExecutionIfInactive();
      }
      
      private void startExecutionIfInactive() {
          if(active.compareAndSet(false, true)) {
              pool.execute(this);
          }
      }
      
      @Override
      public synchronized void run() {
          while(!queue.isEmpty()){
              queue.poll().run();
          }
          active.set(false); //all future adds will not be executed on this thread anymore
          if(!queue.isEmpty()) { //if some items were added to the queue after the last queue.poll
              startExecutionIfInactive();// trigger an execution on next thread
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2011-11-03
        • 2015-11-09
        • 1970-01-01
        • 1970-01-01
        • 2011-01-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多