【问题标题】:Java looping Threads using CyclicBarrier使用 CyclicBarrier 的 Java 循环线程
【发布时间】:2014-06-22 02:27:14
【问题描述】:

我有一个具有这种一般结构的程序:

init
create CyclicBarrier
initialise all threads, attaching to barrier
*start all threads*
wait for join
display stats


*start all threads*
perform calculation
await barrier

我的问题是我需要线程的 run() 方法继续循环直到满足某个条件,但在每次迭代后暂停以让所有线程同步。

我已经尝试将 Runnable 方法附加到屏障,但这最终需要重新创建和重新启动每个线程,这不是一个很好的解决方案。

我也尝试过使用 CyclicBarrier 的 reset() 方法,但这似乎只会导致现有线程出错,即使在所有线程完成后执行也是如此。

我的问题是:

-是否可以“重置”一个屏障并让所有屏障线程遵循与第一次调用 await() 之前相同的条件?

-或者我应该使用另一种方法来实现这一点吗?

提前致谢

【问题讨论】:

  • while(true) 粘贴到Thread 中。 await() 循环结束处的屏障。当所有线程都完成后,reset() 屏障和整个过程重新开始......

标签: java multithreading wait barrier cyclicbarrier


【解决方案1】:

barrier.wait() 将暂停线程。屏障已经在主线程中,不需要另一个。在上面的算法中,您显示了在显示统计信息后重新启动的线程。您不需要这样做。如果最近唤醒的线程处于循环中,它们将再次回到barrier.wait()。

【讨论】:

    【解决方案2】:

    根据@Totoro 的回答,下面是一些示例代码,其中还包含要求“我需要线程的 run() 方法保持循环直到满足某个条件,每次迭代后暂停以让所有线程同步”。这使得它很快变得复杂,但希望程序输出能够澄清示例代码(或者我应该制作更好的示例)。

    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class BarrierCalc implements Runnable {
    
    public static final int CALC_THREADS = 3;
    
    private static final AtomicBoolean runCondition = new AtomicBoolean();
    private static final AtomicBoolean stopRunning = new AtomicBoolean();
    
    public static void main(String[] args) {
    
        CyclicBarrier barrier = new CyclicBarrier(CALC_THREADS + 1);
        for (int i = 0; i < CALC_THREADS; i++) {
             new Thread(new BarrierCalc(barrier)).start();
        }
        try {
            runCondition.set(true);
            barrier.await();
            showln(0, "STATS!");
    
            barrier.await();
            showln(0, "start looping 1");
            Thread.sleep(200);
            runCondition.set(false);
            showln(0, "stop looping 1");
            barrier.await();
            runCondition.set(true);
    
            barrier.await();
            showln(0, "start looping 2");
            Thread.sleep(100);
            runCondition.set(false);
            showln(0, "stop looping 2");
            barrier.await();
    
            stopRunning.set(true);
            showln(0, "finishing");
            barrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static final AtomicInteger calcId = new AtomicInteger();
    
    private CyclicBarrier barrier;
    private int id;
    
    public BarrierCalc(CyclicBarrier barrier) {
        this.barrier = barrier;
        id = calcId.incrementAndGet();
    }
    
    public void run() {
    
        showln(id, "waiting for start");
        try {
            barrier.await(); // display stats
            barrier.await(); // start running
            int loopNumber = 0;
            while (!stopRunning.get()) {
                showln(id, "looping " + (++loopNumber));
                while (runCondition.get()) {
                    Thread.sleep(10); // simulate looping
                }
                showln(id, "synchronizing " + loopNumber);
                barrier.await();
                showln(id, "synchronized " + loopNumber);
                // give main thread a chance to set stopCondition and runCondition
                barrier.await();
            }
            showln(id, "finished");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static final long START_TIME = System.currentTimeMillis();
    
    public static void showln(int id, String msg) {
        System.out.println((System.currentTimeMillis() - START_TIME) + "\t ID " + id + ": " + msg);
    }
    
    }
    

    请记住,程序输出的顺序可能不符合预期:同时写入一个同步输出 (System.out) 的线程会以随机顺序获得写入访问权限。

    【讨论】:

      【解决方案3】:

      您可以看一下我使用 CyclicBarrier 的示例。这里每个工人进行一些计算,并在屏障处检查条件。如果满足条件,则所有worker停止计算,否则继续:

      class Solver {
          private static final int REQUIRED_AMOUNT = 100;
          private static final int NUMBER_OF_THREADS = 4;
      
          AtomicInteger atomicInteger = new AtomicInteger();
          AtomicBoolean continueCalculation = new AtomicBoolean(true);
          final CyclicBarrier barrier;
      
          public static void main(String[] args) {
              new Solver();
          }
      
          class Worker implements Runnable {
              int workerId;
              Worker(int workerId) {
                  this.workerId = workerId;
              }
      
              public void run() {
                  try {
                      while(continueCalculation.get()) {
                          calculate(workerId);
                          barrier.await();
                      }
      
                  } catch (Exception ex) {
                      System.out.println("Finishing " + workerId);
                  }
              }
          }
      
          public Solver() {
              Runnable barrierAction = () -> {
                  if (done()) {
                      continueCalculation.set(false);
                  }
              };
      
              barrier = new CyclicBarrier(NUMBER_OF_THREADS, barrierAction);
      
              List<Thread> threads = new ArrayList(NUMBER_OF_THREADS);
              for (int i = 0; i < NUMBER_OF_THREADS; i++) {
                  Thread thread = new Thread(new Worker(i));
                  threads.add(thread);
                  thread.start();
              }
          }
      
          private void calculate(int workerId) throws InterruptedException {
              // Some long-running calculation
              Thread.sleep(2000L);
              int r = new Random().nextInt(12);
      
              System.out.println("Worker #" + workerId + " added " + r +" = " + atomicInteger.addAndGet(r));
          }
      
          private boolean done() {
              int currentResult = atomicInteger.get();
              boolean collected = currentResult >= REQUIRED_AMOUNT;
      
              System.out.println("=======================================================");
              System.out.println("Checking state at the barrier: " + currentResult);
              if (collected) {
                  System.out.println("Required result is reached");
              }
              System.out.println("=======================================================");
      
              return collected;
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2014-07-29
        • 2014-08-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-08-27
        • 2013-12-24
        • 2012-03-28
        • 1970-01-01
        相关资源
        最近更新 更多