【问题标题】:Thread Pool handling 'duplicate' tasks线程池处理“重复”任务
【发布时间】:2012-02-12 21:58:06
【问题描述】:

我想并行执行一些不同的任务,但有一个概念,如果一个任务已经排队或正在处理,它不会重新排队。我已经阅读了一些关于 Java API 的内容,并提出了下面的代码,这似乎可行。 任何人都可以阐明我使用的方法是否是最好的方法。任何危险(线程安全?)或更好的方法来做到这一点? 代码如下:

import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecution implements Runnable {
   String key1;
   String key2;   
   static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
   static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
   static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);

   public static void main(String[] args) {
      try {
         execute(new TestExecution("A", "A"));
         execute(new TestExecution("A", "A"));
         execute(new TestExecution("B", "B"));
         Thread.sleep(8000);
         execute(new TestExecution("B", "B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   static boolean execute(TestExecution e) {
      System.out.println("Handling "+e.key1+":"+e.key2);
      if (executions.containsKey(e)) {
         Future<?> f = (Future<?>) executions.get(e);
         if (f.isDone()) {
            System.out.println("Previous execution has completed");
            executions.remove(e);
         } else {
            System.out.println("Previous execution still running");
            return false;
         }         
      }
      else {
         System.out.println("No previous execution");
      }
      Future<?> f = tpe.submit(e);
      executions.put(e, f);            
      return true;
   }

   public TestExecution(String key1, String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }              
}

关注下方评论:
计划是触发要执行的任务将由 cron 调用 RESTful Web 服务来处理。例如,下面是每天 9:30 触发的一项任务的设置,以及每两分钟安排的另一项任务。

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22

在这种情况下,如果任务 key11:key12 正在运行,或者已经排队运行,我不想排队另一个实例。我知道我们还有其他调度选项,但是我们倾向于将 cron 用于其他任务,所以我想尽量保留这个。

第二次更新。为了响应 cmets 到目前为止我已经重写了代码,您能否评论以下更新解决方案的任何问题?

import java.util.concurrent.LinkedBlockingQueue;

public class TestExecution implements Runnable {
   String key1;
   String key2;      
   static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());

   public static void main(String[] args) {
      try {
         tpe.execute(new TestExecution("A", "A"));
         tpe.execute(new TestExecution("A", "A"));
         tpe.execute(new TestExecution("B", "B"));
         Thread.sleep(8000);
         tpe.execute(new TestExecution("B", "B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public TestExecution(String key1, String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }
}


import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestThreadPoolExecutor extends ThreadPoolExecutor {
   Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());

   public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {      
      super(2, 5, 1, TimeUnit.MINUTES, q);      
   }

   public void execute(Runnable command) {
      if (executions.contains(command)) {
         System.out.println("Previous execution still running");
         return;
      }
      else {
         System.out.println("No previous execution");
      }
      super.execute(command);      
      executions.add(command);      
   }

   protected void afterExecute(Runnable r, Throwable t) {
      super.afterExecute(r, t);        
      executions.remove(r);
   }      
}

【问题讨论】:

  • 为什么不在 TestExecution 中使用 hashset 而不是 HashMap??

标签: java multithreading threadpool


【解决方案1】:

这是我将如何处理和避免重复

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.*;

public class TestExecution implements Callable<Void> {
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>());

    private final String key1;
    private final String key2;

    public static void main(String... args) throws InterruptedException {
        new TestExecution("A", "A").execute();
        new TestExecution("A", "A").execute();
        new TestExecution("B", "B").execute();
        Thread.sleep(8000);
        new TestExecution("A", "A").execute();
        new TestExecution("B", "B").execute();
        new TestExecution("B", "B").execute();
        TPE.shutdown();
    }

    public TestExecution(String key1, String key2) {
        this.key1 = key1;
        this.key2 = key2;
    }

    void execute() {
        if (TE_SET.add(this)) {
            System.out.println("Handling " + this);
            TPE.submit(this);
        } else {
            System.out.println("... ignoring duplicate " + this);
        }
    }

    public boolean equals(Object obj) {
        return obj instanceof TestExecution &&
                key1.equals(((TestExecution) obj).key1) &&
                key2.equals(((TestExecution) obj).key2);
    }

    public int hashCode() {
        return key1.hashCode() * 31 + key2.hashCode();
    }

    @Override
    public Void call() throws InterruptedException {
        if (!TE_SET.remove(this)) {
            System.out.println("... dropping duplicate " + this);
            return null;
        }
        System.out.println("Start processing " + this);
        Thread.sleep(4000);
        System.out.println("Finish processing " + this);
        return null;
    }

    public String toString() {
        return key1 + ':' + key2;
    }
}

打印

Handling A:A
... ignoring duplicate A:A
Handling B:B
Start processing A:A
Start processing B:B
Finish processing A:A
Finish processing B:B
Handling A:A
Handling B:B
Start processing A:A
Start processing B:B
... ignoring duplicate B:B
Finish processing B:B
Finish processing A:A

【讨论】:

  • 好的,谢谢,那里有一些好的指针,特别是通过使用 ConcurrentHashMap 和覆盖 toString 方法来避免多线程问题。几个问题。为什么不使用 HashSet(是因为没有等效的线程安全对象可以使用?)另外我不明白从 HashMap 中删除的代码。您似乎在处理开始时执行此操作,不应该在处理结束时执行吗?
  • 你可以使用Collections.synchronizedSet(new HashSet()) 这是线程安全的,但不是并发的。它是在开始还是结束取决于您的要求。是偶尔做两件事情还是偶尔不做事情更好(因为新任务是在任务结束和被移除之间添加的)
  • 好的,我真的不知道“线程安全”和“并发”之间的区别,也许我需要做一些调查。但是对于删除元素,如果我不想重新执行(或排队)已经开始的作业,那么 TE_SET.remove 应该移到 Call 函数的末尾,对吗?假设“删除重复”案例是错误案例,我是否正确?如果我们在调用函数中,那么一个元素应该一直被写入 HashSet 对吧?
  • 我的定义:线程安全意味着它在被多个线程使用时具有可预测且一致的状态。顺便说一句:这并不一定意味着锁定。并发意味着它可以被多个线程使用而不会相互阻塞。
  • 把它放在最后是你想要的。对于我工作的系统,一个罕见的重复比放弃一个任务要好。
【解决方案2】:

几个cmets:

  • 在执行方法中,如果多个线程同时调用此方法,您将在“执行”(containsKey)的读取和写入(删除或放置)之间获得竞争条件。您需要将所有对“执行”的调用包装在同步块中,这些调用应该是原子的。 (在您的情况下,使方法同步将起作用)http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • 您应该使用单例而不是静态(即全局)变量来处理状态

但我真的很想进一步了解您的设计,以了解您想要实现的目标。为什么一个任务会被多次排队执行?

【讨论】:

  • 谢谢,我已经用更多信息更新了这个问题。
  • 对于更面向对象的设计,我会考虑继承 ThreadPoolExecutor 并将用于管理执行映射的代码放在 execute() 和 afterExecute() 函数中。 (在我看来,调用 execute() 而不是 submit() 更正确,但规范在这一点上并不明确)
  • 干杯,已根据您的建议重写了代码。这样看起来好些了吗?
  • 您似乎通过使用 ConcurrentHashMap 的原子指令避免了竞态条件(我必须说比在 executions() 中添加“同步”更优雅的解决方案)。 TestThreadPoolExecutor 也不错,它将所有队列处理代码封装在 Executor 中,避免了全局变量。最后,您可以在 TestExecution 构造函数中为 null 添加检查。