【问题标题】:How is CountDownLatch used in Java Multithreading?Java多线程中如何使用CountDownLatch?
【发布时间】:2023-03-21 23:56:01
【问题描述】:

谁能帮我理解 Java CountDownLatch 是什么以及何时使用它?

我对这个程序的工作原理不是很清楚。据我了解,所有三个线程同时启动,每个线程将在 3000 毫秒后调用 CountDownLatch。所以倒计时会一一递减。锁存器变为零后,程序会打印“已完成”。也许我理解的方式是不正确的。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor implements Runnable {
    private CountDownLatch latch;

    public Processor(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        System.out.println("Started.");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        latch.countDown();
    }
}

// --------------------------------------------- --------

public class App {

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0

        ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool

        for(int i=0; i < 3; i++) {
            executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
        }

        try {
            latch.await();  // wait until latch counted down to 0
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Completed.");
    }

}

【问题讨论】:

  • 我刚刚将您的问题示例代码用于 android 并行服务批处理,它就像一个魅力。非常感谢!
  • 从 2012 年的 this video 获得这里,这与此处显示的示例非常相似。对于任何感兴趣的人,这是来自一个名叫 John 的人的 Java 多线程教程系列的一部分。我喜欢约翰。强烈推荐。

标签: java multithreading countdown countdownlatch


【解决方案1】:

是的,你理解正确。 CountDownLatch 工作原理是闩锁,主线程会一直等到门打开。一个线程等待 n 个线程,在创建 CountDownLatch 时指定。

任何调用CountDownLatch.await() 的线程,通常是应用程序的主线程,都会等待计数达到零或被另一个线程中断。一旦完成或准备好,所有其他线程都需要通过调用CountDownLatch.countDown() 来倒计时。

一旦计数达到零,等待线程继续。 CountDownLatch 的缺点/优点之一是它不可重复使用:一旦计数达到零,您就不能再使用 CountDownLatch

编辑:

当一个线程(如主线程)需要等待一个或多个线程完成才能继续处理时,使用CountDownLatch

在 Java 中使用CountDownLatch 的经典示例是使用服务架构的服务器端核心 Java 应用程序,其中多个服务由多个线程提供,并且在所有服务都成功启动之前应用程序无法开始处理。

附: OP 的问题有一个非常简单的例子,所以我没有包括一个。

【讨论】:

  • 感谢您的回复。你能给我一个应用倒计时闩锁的例子吗?
  • CountDownLatch的使用教程在这里howtodoinjava.com/2013/07/18/…
  • @NikolaB 但是在这个给定的例子中,我们可以通过使用 join 方法获得相同的结果,不是吗?
  • 我认为不可重用性是一个优势:您确定没有人可以重置它或增加计数。
  • 很好的解释。但我对One thread waits for n number of threads specified while creating CountDownLatch in Java这一点略有不同意见。如果你需要这样的机制,那么谨慎使用CyclicBarrierJava concurrency in Practice 中给出的这两者之间的基本概念区别是:Latches are for waiting for events; barriers are for waiting for other threadscyclicBarrier.await() 进入阻塞状态。
【解决方案2】:

Java 中的CountDownLatch 是一种同步器,它允许一个Thread 在开始处理之前等待一个或多个Threads。

CountDownLatch 的工作原理是闩锁,线程将等待直到门打开。一个线程等待n 创建CountDownLatch 时指定的线程数。

例如final CountDownLatch latch = new CountDownLatch(3);

这里我们将计数器设置为 3。

调用CountDownLatch.await() 的任何线程,通常是应用程序的主线程,将等待计数达到零或被另一个Thread 中断。一旦完成或准备好工作,所有其他线程都需要通过调用CountDownLatch.countDown() 进行倒计时。一旦计数达到零,Thread 等待开始运行。

这里的计数被CountDownLatch.countDown() 方法递减。

调用await()方法的Thread会一直等到初始计数归零。

要使计数为零,其他线程需要调用countDown() 方法。 一旦计数变为零,调用await() 方法的线程将恢复(开始执行)。

CountDownLatch 的缺点是它不可重复使用:一旦计数变为零,它就不再可用。

【讨论】:

  • 我们是否使用 new CountDownLatch(3),因为我们定义了来自 newFixedThreadPool 的 3 个线程?
  • “开始处理之前”不应该是“继续处理之前”吗?
  • @Arefe 是的,这是通过您的代码块的线程数
【解决方案3】:

NikolaB 解释的很好,但是例子会有助于理解,所以这里是一个简单的例子...

 import java.util.concurrent.*;


  public class CountDownLatchExample {

  public static class ProcessThread implements Runnable {

    CountDownLatch latch;
    long workDuration;
    String name;

    public ProcessThread(String name, CountDownLatch latch, long duration){
        this.name= name;
        this.latch = latch;
        this.workDuration = duration;
    }


    public void run() {
        try {
            System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
            Thread.sleep(workDuration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name+ "completed its works");
        //when task finished.. count down the latch count...

        // basically this is same as calling lock object notify(), and object here is latch
        latch.countDown();
    }
}


public static void main(String[] args) {
    // Parent thread creating a latch object
    CountDownLatch latch = new CountDownLatch(3);

    new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
    new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
    new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs


    System.out.println("waiting for Children processes to complete....");
    try {
        //current thread will get notified if all chidren's are done 
        // and thread will resume from wait() mode.
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("All Process Completed....");

    System.out.println("Parent Thread Resuming work....");



     }
  }

【讨论】:

    【解决方案4】:

    当我们想要等待多个线程完成其任务时使用它。类似于线程中的join。

    我们可以在哪里使用 CountDownLatch

    考虑一个场景,我们有三个线程“A”、“B”和“C”,并且我们希望仅在“A”和“B”线程完成或部分完成它们的线程时启动线程“C”任务。

    可应用于现实世界的 IT 场景

    假设经理在开发团队(A 和 B)之间划分模块,并且他希望仅在两个团队都完成任务时将其分配给 QA 团队进行测试。

    public class Manager {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(2);
            MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
            MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
            teamDevA.start();
            teamDevB.start();
            countDownLatch.await();
            MyQATeam qa = new MyQATeam();
            qa.start();
        }   
    }
    
    class MyDevTeam extends Thread {   
        CountDownLatch countDownLatch;
        public MyDevTeam (CountDownLatch countDownLatch, String name) {
            super(name);
            this.countDownLatch = countDownLatch;       
        }   
        @Override
        public void run() {
            System.out.println("Task assigned to development team " + Thread.currentThread().getName());
            try {
                    Thread.sleep(2000);
            } catch (InterruptedException ex) {
                    ex.printStackTrace();
            }
        System.out.println("Task finished by development team Thread.currentThread().getName());
                this.countDownLatch.countDown();
        }
    }
    
    class MyQATeam extends Thread {   
        @Override
        public void run() {
            System.out.println("Task assigned to QA team");
            try {
                    Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("Task finished by QA team");
        }
    }
    

    上述代码的输出将是:

    分配给开发团队 devB 的任务

    分配给开发团队 devA 的任务

    开发团队 devB 完成的任务

    开发团队 devA 完成的任务

    分配给 QA 团队的任务

    QA 团队完成的任务

    这里await()方法等待countdownlatch标志变为0,countDown()方法将countdownlatch标志减1。

    JOIN 的限制: 上面的例子也可以用JOIN来实现,但是JOIN不能用于两种场景:

    1. 当我们使用 ExecutorService 而不是 Thread 类来创建线程时。
    2. 修改上面的示例,其中经理希望在开发完成 80% 的任务后立即将代码移交给 QA 团队。这意味着 CountDownLatch 允许我们修改可用于等待另一个线程部分执行的实现。

    【讨论】:

      【解决方案5】:

      CoundDownLatch 使您可以让一个线程等待,直到所有其他线程完成执行。

      伪代码可以是:

      // Main thread starts
      // Create CountDownLatch for N threads
      // Create and start N threads
      // Main thread waits on latch
      // N threads completes there tasks are returns
      // Main thread resume execution
      

      【讨论】:

      • 您可能希望从代码块中移出所有描述
      • 最好的评论。我喜欢这些“直截了当”的 cmets,而不是理论解释。
      【解决方案6】:

      正如 JavaDoc (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html) 中提到的,CountDownLatch 是一种同步辅助,在 Java 5 中引入。这里的同步并不意味着限制对关键部分的访问。而是对不同线程的操作进行排序。 通过 CountDownLatch 实现的同步类型与 Join 类似。 假设有一个线程“M”需要等待其他工作线程“T1”、“T2”、“T3”完成其任务 在 Java 1.5 之前,可以这样做的方式是,M 运行以下代码

          T1.join();
          T2.join();
          T3.join();
      

      上面的代码确保线程 M 在 T1、T2、T3 完成它的工作之后恢复它的工作。 T1、T2、T3可以任意顺序完成工作。 同样可以通过 CountDownLatch 实现,其中 T1、T2、T3 和线程 M 共享同一个 CountDownLatch 对象。
      “M”请求: countDownLatch.await();
      其中 "T1","T2","T3" 是 countDownLatch.countdown();

      join 方法的一个缺点是 M 必须知道 T1、T2、T3。如果稍后添加了新的工作线程 T4,那么 M 也必须意识到它。使用 CountDownLatch 可以避免这种情况。 执行后的动作顺序为[T1,T2,T3](T1,T2,T3的顺序可以是反正)-> [M]

      【讨论】:

        【解决方案7】:

        使用 Java Simple Serial Connector 访问串行端口是何时使用此类功能的一个很好的例子。通常,您将向端口写入一些内容,并且异步地,在另一个线程上,设备将响应 SerialPortEventListener。通常,您需要在写入端口后暂停以等待响应。手动处理这种情况下的线程锁非常棘手,但使用 Countdownlatch 很容易。在你想你可以用另一种方式做之前,小心你从未想过的比赛条件!

        伪代码:

        CountDownLatch latch;
        void writeData() { 
           latch = new CountDownLatch(1);
           serialPort.writeBytes(sb.toString().getBytes())
           try {
              latch.await(4, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
           }
        }
        class SerialPortReader implements SerialPortEventListener {
            public void serialEvent(SerialPortEvent event) {
                if(event.isRXCHAR()){//If data is available
                    byte buffer[] = serialPort.readBytes(event.getEventValue());
                    latch.countDown();
                 }
             }
        }
        

        【讨论】:

          【解决方案8】:

          如果您在调用latch.countDown() 之后添加一些调试,这可能会帮助您更好地理解它的行为。

          latch.countDown();
          System.out.println("DONE "+this.latch); // Add this debug
          

          输出将显示 Count 正在递减。这个“计数”实际上是您启动的可运行任务(处理器对象)的数量,countDown() 被调用,因此在调用latch.await( )。

          DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
          DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
          DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]
          

          【讨论】:

            【解决方案9】:

            来自关于CountDownLatch的oracle文档:

            一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

            CountDownLatch 使用给定的计数进行初始化。 await 方法会阻塞,直到由于调用countDown() 方法而使当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用都会立即返回。这是一次性现象——计数无法重置。

            CountDownLatch 是一种多功能同步工具,可用于多种用途。

            使用计数为 1 初始化的 CountDownLatch 用作简单的开/关锁存器或门:所有调用 await 的线程在门处等待,直到它被调用 countDown() 的线程打开。

            初始化为 N 的CountDownLatch 可用于使一个线程等待,直到 N 个线程完成某个动作,或者某个动作已完成 N 次。

            public void await()
                       throws InterruptedException
            

            导致当前线程等待直到锁存器倒计时到零,除非线程被中断。

            如果当前计数为零,则此方法立即返回。

            public void countDown()
            

            减少锁存器的计数,如果计数达到零,则释放所有等待的线程。

            如果当前计数大于零,则递减。如果新计数为零,则重新启用所有等待线程以进行线程调度。

            你的例子的解释。

            1. 您已将 latch 变量的计数设置为 3

              CountDownLatch latch = new CountDownLatch(3);
              
            2. 您已将此共享latch 传递给工作线程:Processor

            3. Processor 的三个Runnable 实例已提交给ExecutorService executor
            4. 主线程(App)正在等待计数变为零,下面的语句

               latch.await();  
              
            5. Processor 线程休眠 3 秒,然后使用 latch.countDown() 递减计数值
            6. 由于latch.countDown(),第一个Process 实例将在完成后将锁存计数更改为2。

            7. 由于latch.countDown(),第二个Process 实例将在完成后将锁存计数更改为1。

            8. 由于latch.countDown(),第三个Process 实例将在完成后将锁存计数更改为0。

            9. 锁存器计数为零导致主线程Appawait出来

            10. 应用程序现在打印此输出:Completed

            【讨论】:

              【解决方案10】:

              Java Doc 的这个例子帮助我清楚地理解了这些概念:

              class Driver { // ...
                void main() throws InterruptedException {
                  CountDownLatch startSignal = new CountDownLatch(1);
                  CountDownLatch doneSignal = new CountDownLatch(N);
              
                  for (int i = 0; i < N; ++i) // create and start threads
                    new Thread(new Worker(startSignal, doneSignal)).start();
              
                  doSomethingElse();            // don't let run yet
                  startSignal.countDown();      // let all threads proceed
                  doSomethingElse();
                  doneSignal.await();           // wait for all to finish
                }
              }
              
              class Worker implements Runnable {
                private final CountDownLatch startSignal;
                private final CountDownLatch doneSignal;
                Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
                   this.startSignal = startSignal;
                   this.doneSignal = doneSignal;
                }
                public void run() {
                   try {
                     startSignal.await();
                     doWork();
                     doneSignal.countDown();
                   } catch (InterruptedException ex) {} // return;
                }
              
                void doWork() { ... }
              }
              

              视觉解读:

              显然,CountDownLatch 允许一个线程(此处为 Driver)等到一堆正在运行的线程(此处为 Worker)执行完毕。

              【讨论】:

                【解决方案11】:
                package practice;
                
                import java.util.concurrent.CountDownLatch;
                
                public class CountDownLatchExample {
                
                    public static void main(String[] args) throws InterruptedException {
                        CountDownLatch c= new CountDownLatch(3);  // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method 
                        Task t = new Task(c);
                        Task t1 = new Task(c);
                        Task t2 = new Task(c);
                        t.start();
                        t1.start();
                        t2.start();
                        c.await(); // when count becomes zero main thread will wake up 
                        System.out.println("This will print after count down latch count become zero");
                    }
                }
                
                class Task extends Thread{
                    CountDownLatch c;
                
                    public Task(CountDownLatch c) {
                        this.c = c;
                    }
                
                    @Override
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getName());
                            Thread.sleep(1000);
                            c.countDown();   // each thread decrement the count by one 
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                

                【讨论】:

                  【解决方案12】:

                  此链接CountDownLatchExample中解释的countDownLatch的最佳实时示例

                  【讨论】:

                    【解决方案13】:

                    最好的选择是CyclicBarrier,根据https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html 见:

                    CountDownLatch 使用给定的计数进行初始化。由于调用了 countDown() 方法,等待方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。

                    【讨论】:

                      猜你喜欢
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      • 2015-06-08
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      相关资源
                      最近更新 更多