【问题标题】:Controlling Task execution order with ExecutorService使用 ExecutorService 控制任务执行顺序
【发布时间】:2011-01-10 08:28:09
【问题描述】:

我有一个将异步任务委托给线程池的进程。我需要确保某些任务按顺序执行。 比如

任务按顺序到达

任务 a1、b1、c1、d1、e1、a2、a3、b2、f1

任务可以按任何顺序执行,除非存在自然依赖关系,因此必须按该顺序处理 a1、a2、a3,方法是分配给同一个线程或阻塞这些线程,直到我知道前一个 a# 任务已完成。

目前它不使用 Java Concurrency 包,但我正在考虑更改以利用线程管理。

有没有人有类似的解决方案或如何实现这一点的建议

【问题讨论】:

    标签: java concurrency executorservice


    【解决方案1】:

    当您向ExecutorService 提交RunnableCallable 时,您将收到Future 作为回报。将依赖 a1 的线程传递给 a1 的 Future 并调用 Future.get()。这将阻塞,直到线程完成。

    所以:

    ExecutorService exec = Executor.newFixedThreadPool(5);
    Runnable a1 = ...
    final Future f1 = exec.submit(a1);
    Runnable a2 = new Runnable() {
      @Override
      public void run() {
        f1.get();
        ... // do stuff
      }
    }
    exec.submit(a2);
    

    等等。

    【讨论】:

    • 我认为这不适用于固定线程池,因为线程可能会同时阻塞 f1.get() 并死锁。
    • 适当调整池的大小。
    • 缓存线程池有其自身的问题。如果提交过多,主题创建可能会失控。
    【解决方案2】:

    另一种选择是创建自己的执行器,将其称为 OrderedExecutor,并创建一个封装的 ThreadPoolExecutor 对象数组,每个内部执行器有 1 个线程。然后,您提供一种机制来选择其中一个内部对象,例如,您可以通过提供您的类的用户可以实现的接口来做到这一点:

    executor = new OrderedExecutor( 10 /* 池大小 */, new OrderedExecutor.Chooser() { 公共 int 选择(可运行可运行){ MyRunnable myRunnable = (MyRunnable)runnable; 返回 myRunnable.someId(); }); executor.execute(new MyRunnable());

    OrderedExecutor.execute() 的实现然后将使用选择器获取一个 int,您使用池大小对其进行修改,这就是您在内部数组中的索引。想法是“someId()”将为所有“a”等返回相同的值。

    【讨论】:

      【解决方案3】:

      当我过去这样做时,我通常由组件处理排序,然后将可调用/可运行对象提交给执行器。

      类似的东西。

      • 获得了要运行的任务列表,其中一些具有依赖关系
      • 创建 Executor 并使用 ExecutorCompletionService 进行包装
      • 搜索所有任务,任何没有依赖关系的任务,通过完成服务安排它们
      • 轮询完成服务
      • 随着每项任务的完成
        • 将其添加到“已完成”列表中
        • 重新评估“已完成列表”中的所有等待任务,以查看它们是否“相关性完成”。如果是这样安排他们
        • 冲洗重复,直到所有任务都提交/完成

      完成服务是一种能够在任务完成时获取任务的好方法,而不是尝试轮询一堆 Futures。但是,您可能希望保留一个Map<Future, TaskIdentifier>,当通过完成服务安排任务时填充该地址,以便当完成服务为您提供已完成的 Future 时,您可以确定它是哪个 TaskIdentifier

      如果您发现自己处于任务仍在等待运行的状态,但没有任何东西在运行并且无法安排任何事情,那么您就有了循环依赖问题。

      【讨论】:

      • 你能给我们举个例子吗
      【解决方案4】:

      我编写了自己的 Executor 来保证具有相同键的任务的任务排序。它使用队列映射来处理具有相同键的订单任务。每个键控任务使用相同的键执行下一个任务。

      此解决方案不处理 RejectedExecutionException 或委托执行器的其他异常!所以委派的Executor应该是“无限的”。

      import java.util.HashMap;
      import java.util.LinkedList;
      import java.util.Map;
      import java.util.Queue;
      import java.util.concurrent.Executor;
      
      /**
      * This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
      */
      public class OrderingExecutor implements Executor{
      
          private final Executor delegate;
          private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();
      
          public OrderingExecutor(Executor delegate){
              this.delegate = delegate;
          }
      
          @Override
          public void execute(Runnable task) {
              // task without key can be executed immediately
              delegate.execute(task);
          }
      
          public void execute(Runnable task, Object key) {
              if (key == null){ // if key is null, execute without ordering
                  execute(task);
                  return;
              }
      
              boolean first;
              Runnable wrappedTask;
              synchronized (keyedTasks){
                  Queue<Runnable> dependencyQueue = keyedTasks.get(key);
                  first = (dependencyQueue == null);
                  if (dependencyQueue == null){
                      dependencyQueue = new LinkedList<Runnable>();
                      keyedTasks.put(key, dependencyQueue);
                  }
      
                  wrappedTask = wrap(task, dependencyQueue, key);
                  if (!first)
                      dependencyQueue.add(wrappedTask);
              }
      
              // execute method can block, call it outside synchronize block
              if (first)
                  delegate.execute(wrappedTask);
      
          }
      
          private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
              return new OrderedTask(task, dependencyQueue, key);
          }
      
          class OrderedTask implements Runnable{
      
              private final Queue<Runnable> dependencyQueue;
              private final Runnable task;
              private final Object key;
      
              public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
                  this.task = task;
                  this.dependencyQueue = dependencyQueue;
                  this.key = key;
              }
      
              @Override
              public void run() {
                  try{
                      task.run();
                  } finally {
                      Runnable nextTask = null;
                      synchronized (keyedTasks){
                          if (dependencyQueue.isEmpty()){
                              keyedTasks.remove(key);
                          }else{
                              nextTask = dependencyQueue.poll();
                          }
                      }
                      if (nextTask!=null)
                          delegate.execute(nextTask);
                  }
              }
          }
      }
      

      【讨论】:

      • +1。感谢那。我会使用这个植入,但我真的不知道这怎么没有被标记为问题的最终答案。
      【解决方案5】:

      Habanero-Java library中,有一个数据驱动任务的概念,可以用来表达任务之间的依赖关系,避免线程阻塞操作。在幕后 Habanero-Java 库使用 JDK 的 ForkJoinPool(即 ExecutorService)。

      例如,您的任务 A1、A2、A3、... 的用例可以表示如下:

      HjFuture a1 = future(() -> { doA1(); return true; });
      HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; });
      HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });
      

      请注意,a1、a2 和 a3 只是对 HjFuture 类型的对象的引用,并且可以在您的自定义数据结构中进行维护,以在任务 A2 和 A3 在运行时进入时指定依赖关系。

      有一些tutorial slides available。 您可以找到更多文档,如 javadocAPI summaryprimers

      【讨论】:

        【解决方案6】:

        您可以使用 Executors.newSingleThreadExecutor(),但它只会使用一个线程来执行您的任务。另一种选择是使用 CountDownLatch。这是一个简单的例子:

        public class Main2 {
        
        public static void main(String[] args) throws InterruptedException {
        
            final CountDownLatch cdl1 = new CountDownLatch(1);
            final CountDownLatch cdl2 = new CountDownLatch(1);
            final CountDownLatch cdl3 = new CountDownLatch(1);
        
            List<Runnable> list = new ArrayList<Runnable>();
            list.add(new Runnable() {
                public void run() {
                    System.out.println("Task 1");
        
                    // inform that task 1 is finished
                    cdl1.countDown();
                }
            });
        
            list.add(new Runnable() {
                public void run() {
                    // wait until task 1 is finished
                    try {
                        cdl1.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
        
                    System.out.println("Task 2");
        
                    // inform that task 2 is finished
                    cdl2.countDown();
                }
            });
        
            list.add(new Runnable() {
                public void run() {
                    // wait until task 2 is finished
                    try {
                        cdl2.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
        
                    System.out.println("Task 3");
        
                    // inform that task 3 is finished
                    cdl3.countDown();
                }
            });
        
            ExecutorService es = Executors.newFixedThreadPool(200);
            for (int i = 0; i < 3; i++) {
                es.submit(list.get(i));
            }
        
            es.shutdown();
            es.awaitTermination(1, TimeUnit.MINUTES);
        }
        }
        

        【讨论】:

          【解决方案7】:

          我为这个问题创建了一个 OrderingExecutor。如果将相同的 key 传递给具有不同 runnable 的方法 execute(),则具有相同 key 的 runnable 的执行将按照 execute() 的调用顺序,并且永远不会重叠。

          import java.util.Arrays;
          import java.util.Collection;
          import java.util.Iterator;
          import java.util.Queue;
          import java.util.concurrent.ConcurrentHashMap;
          import java.util.concurrent.ConcurrentLinkedQueue;
          import java.util.concurrent.ConcurrentMap;
          import java.util.concurrent.Executor;
          
          /**
           * Special executor which can order the tasks if a common key is given.
           * Runnables submitted with non-null key will guaranteed to run in order for the same key.
           *
           */
          public class OrderedExecutor {
          
              private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
                      new ConcurrentLinkedQueue<Runnable>());
          
              private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>();
              private Executor delegate;
              private volatile boolean stopped;
          
              public OrderedExecutor(Executor delegate) {
                  this.delegate = delegate;
              }
          
              public void execute(Runnable runnable, Object key) {
                  if (stopped) {
                      return;
                  }
          
                  if (key == null) {
                      delegate.execute(runnable);
                      return;
                  }
          
                  Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> {
                      v.add(runnable);
                      return v;
                  });
                  if (queueForKey == null) {
                      // There was no running task with this key
                      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>());
                      newQ.add(runnable);
                      // Use putIfAbsent because this execute() method can be called concurrently as well
                      queueForKey = taskMap.putIfAbsent(key, newQ);
                      if (queueForKey != null)
                          queueForKey.add(runnable);
                      delegate.execute(new InternalRunnable(key));
                  }
              }
          
              public void shutdown() {
                  stopped = true;
                  taskMap.clear();
              }
          
              /**
               * Own Runnable used by OrderedExecutor.
               * The runnable is associated with a specific key - the Queue&lt;Runnable> for this
               * key is polled.
               * If the queue is empty, it tries to remove the queue from taskMap. 
               *
               */
              private class InternalRunnable implements Runnable {
          
                  private Object key;
          
                  public InternalRunnable(Object key) {
                      this.key = key;
                  }
          
                  @Override
                  public void run() {
                      while (true) {
                          // There must be at least one task now
                          Runnable r = taskMap.get(key).poll();
                          while (r != null) {
                              r.run();
                              r = taskMap.get(key).poll();
                          }
                          // The queue emptied
                          // Remove from the map if and only if the queue is really empty
                          boolean removed = taskMap.remove(key, EMPTY_QUEUE);
                          if (removed) {
                              // The queue has been removed from the map,
                              // if a new task arrives with the same key, a new InternalRunnable
                              // will be created
                              break;
                          } // If the queue has not been removed from the map it means that someone put a task into it
                            // so we can safely continue the loop
                      }
                  }
              }
          
              /**
               * Special Queue implementation, with equals() and hashCode() methods.
               * By default, Java SE queues use identity equals() and default hashCode() methods.
               * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()).
               *
               * @param <E> The type of elements in the queue.
               */
              private static class QueueWithHashCodeAndEquals<E> implements Queue<E> {
          
                  private Queue<E> delegate;
          
                  public QueueWithHashCodeAndEquals(Queue<E> delegate) {
                      this.delegate = delegate;
                  }
          
                  public boolean add(E e) {
                      return delegate.add(e);
                  }
          
                  public boolean offer(E e) {
                      return delegate.offer(e);
                  }
          
                  public int size() {
                      return delegate.size();
                  }
          
                  public boolean isEmpty() {
                      return delegate.isEmpty();
                  }
          
                  public boolean contains(Object o) {
                      return delegate.contains(o);
                  }
          
                  public E remove() {
                      return delegate.remove();
                  }
          
                  public E poll() {
                      return delegate.poll();
                  }
          
                  public E element() {
                      return delegate.element();
                  }
          
                  public Iterator<E> iterator() {
                      return delegate.iterator();
                  }
          
                  public E peek() {
                      return delegate.peek();
                  }
          
                  public Object[] toArray() {
                      return delegate.toArray();
                  }
          
                  public <T> T[] toArray(T[] a) {
                      return delegate.toArray(a);
                  }
          
                  public boolean remove(Object o) {
                      return delegate.remove(o);
                  }
          
                  public boolean containsAll(Collection<?> c) {
                      return delegate.containsAll(c);
                  }
          
                  public boolean addAll(Collection<? extends E> c) {
                      return delegate.addAll(c);
                  }
          
                  public boolean removeAll(Collection<?> c) {
                      return delegate.removeAll(c);
                  }
          
                  public boolean retainAll(Collection<?> c) {
                      return delegate.retainAll(c);
                  }
          
                  public void clear() {
                      delegate.clear();
                  }
          
                  @Override
                  public boolean equals(Object obj) {
                      if (!(obj instanceof QueueWithHashCodeAndEquals)) {
                          return false;
                      }
                      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj;
                      return Arrays.equals(toArray(), other.toArray());
                  }
          
                  @Override
                  public int hashCode() {
                      return Arrays.hashCode(toArray());
                  }
          
              }
          
          }
          

          【讨论】:

            【解决方案8】:

            我已经编写了我赢得的执行程序服务,它是序列感知的。它对包含某些相关参考和当前进行中的任务进行排序。

            你可以在https://github.com/nenapu/SequenceAwareExecutorService查看实现

            【讨论】:

              猜你喜欢
              • 2015-11-09
              • 1970-01-01
              • 2017-02-16
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2019-05-31
              相关资源
              最近更新 更多