【问题标题】:Run 100 threads in parallel and run missing threads if some previous are finished并行运行 100 个线程,如果之前的一些线程完成,则运行缺失的线程
【发布时间】:2011-07-27 09:14:39
【问题描述】:

例如,我需要始终运行 100 个线程来执行某些操作。 我有一个名为ThreadsWorker 的类,它会查找线程数并在之前的某些线程完成时运行丢失的线程。 所以,这是描述情况的表格:

1 second: 100 threads
2 second: 92 threads (ThreadsWorker generates new 8 threads)
3 second: 100 theads
4 second: 72 threads (ThreadsWorker generates 28 threads)

等等。 我的线程是匿名调用(只是new Thread(new Runnable(...)).start()),因为我不知道如何正确地将它们保存到Threads[] 数组,因为虽然ThreadsWorker 将保存threads[i] = new Threads(),但一些线程可能已经完成,然后会有一些与数组索引冲突。

由于匿名调用,我现在使用threadsCount 变量,并在线程主体开始时递增,在线程主体结束时递减(使用synchronized)。好的,它可以正常工作,我的唯一方法是使用while() 循环,当进度完成时检查threadsCount == 0

我认为这是 C 风格但不是 Java 方式 :) 那么,你能帮我用 Java 方式做吗?

【问题讨论】:

    标签: java multithreading parallel-processing


    【解决方案1】:

    如果您的目标只是让 100 个线程主动处理,我建议查看 Java thread pools(以及更普遍的 Executors)。

    我不清楚您是要让所有 100 个线程继续运行还是等待它们全部完成。您的问题同时引用了两者(ThreadsWorker 产生 28 个新线程,threadsCount==0),它们似乎相互矛盾。

    【讨论】:

    • +1 用于为此使用并发工具而不是原始线程。请在此处查看我的答案以获取一些样板代码。 stackoverflow.com/questions/5115940/…
    • 所以如果我要使用 «newFixedThreadPool()» 那么我应该使用一些 Worker 来无限地为池创建新线程到这个队列?
    【解决方案2】:

    将所有线程放入一个数组或集合中。

    然后循环遍历每个调用 Thread.join() 的集合。当这个循环完成时,所有线程都完成了。

    ArrayList threads = new ArrayList();
    for (int i = 0; i < 5; i++) {
      Thread t = new AweseomeThread();
      t.start();
      threads.add(t);
    }
    
    for (Thread t : threads) {
      t.join();
    }
    

    您还需要一些异常处理(例如 InterruptedException)。但是,我将把它作为练习留给读者...... :)

    【讨论】:

    • 是的,我知道 join 方法和异常 :) 但我不知道在这种情况下如何正确使用索引和线程数组。我的意思是,如果我将一些索引留空,因为 ThreadsWorker 会认为threadsCount = 15,但是当他运行和保存线程描述符时,另一个线程会死掉并且 f.e.线程 [71] 和线程 [75] 将带有描述符,但线程 [72] 到线程 [74] 将具有空描述符。所以我必须暂停所有线程,而 ThreadsWorker 会将索引保存到数组以避免空索引?
    • 我可能会建议不要尝试使用数组,而是使用集合。既然你提到了这一点,我已经重新阅读了你的问题,我误解了。我以为您只是想知道所有线程何时完成,但似乎您想保持 100 个线程处于活动状态。我有另一个想法......不过,为了清楚起见,我将作为单独的答案提交,以防我仍然误解你的问题。
    【解决方案3】:

    http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html

    你可以试试CountDownLatchjdk api类

    private CountDownLatch latch;
    private static class SimpleThread extends Thread {
     public void run() {
      latch.countDown();
     }
    }
    public static void main(String[] args) {
     int threadcount = 10;
     latch = new CountDownLatch(threadcount);
     for (int i = 0; i < 10; i++) {
      Thread t = new SimpleThread();
      t.start();
     }
     // waiting threads all finished
     latch.await();
    }
    

    从Main类的latch获取线程数

    【讨论】:

    • 我认为这是等待另一个线程中的给定操作完成,但发帖人要求等待线程完成并返回,对吗? Thread.join() 不是更适合他的情况吗?
    • CountDownLatch 可以帮助 Main 等待所有线程完成,并缓存线程数。
    • 谢谢,我今天读到了关于 CountDownLatch 的文章,但我认为这对我的任务来说太庞大了(join 是一种绝妙的方法,但索引数组有一些问题)。而且可能不太正确。不过还是谢谢!
    • > «帮助 Main 等待所有线程完成,并缓存线程数» O!你能展示样品吗?之后我会考虑什么对我的情况更有利。
    【解决方案4】:

    我相信您正在尝试让 ThreadWorker 为所有已完成的线程提交新线程。

    我会使用 BlockingQueue,线程(您的 Runnable(s))在完成时添加到该队列中。 ThreadWorker 将等待一个线程完成,然后再启动一个新线程。

    public class YourRunnable implements Runnable {
      private final BlockingQueue<YourRunnable> queue;
      public YourRunnable(BlockingQueue<YourRunnable> queue){
        this.queue = queue;
      }
      public void run{
          // Your Code...
          // Finished Processing
          queue.add(this);
      }
    }
    public class ThreadWorkder implements Runnable { 
      private final BlockingQueue<YourRunnable> queue;
      ThreadWorker(BlockingQueue<YourRunnable> queue){
        this.queue = queue;
      }
      public void run{
        while(queue.take()){
           (new Thread(new YourRunnable(queue))).start();
        }
      }
      // general main method
      public static void main(String [] args){
        BlockingQueue<YourRunnable> queue = new LinkedBlockingQueue<YourRunnable>();
        ThreadWorker worker = new ThreadWorker(queue);
        Thread(worker).start();
        for (int i = 0; i < 100; i++){
          (new Thread(new YourRunnable(queue))).start();
        }
      }
    }
    

    【讨论】:

      【解决方案5】:

      使用集合而不是数组。线程完成后,让它们从数组中删除。像这样的:

      public class Foo {
        Vector<Thread> threads = new Vector<Thread>(); //Vector is threadsafe
      
        public ensureThreadCount(int count) {
          while (threads.size() < count) {
            Thread t = new AweseomeThread(threads);
            threads.add(t);
            t.start();
          }
        }
      }
      
      public class AwesomeThread {
        Collection threads;
        public AwesomeThread(Collection threads) {
          this.threads = threads;
        }
      
        public void run() {
          try {
            // do stuff
          } catch (Throwable t) {
          } finally {
            threads.remove(this);
          }
        }
      }
      

      然后,让您的工作人员调用 Foo.ensureThreadCount()。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-04-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-11-16
        相关资源
        最近更新 更多