【问题标题】:Electing a thread for barrier action execution - Java CyclicBarrier为屏障动作执行选择一个线程 - Java CyclicBarrier
【发布时间】:2012-01-27 01:20:14
【问题描述】:

查看 CyclicBarrier 的 javadocs,我在类文档中发现了以下我不完全理解的语句。来自javadoc

如果屏障动作在执行时不依赖于被挂起的各方,那么该方中的任何线程都可以在释放时执行该动作。为了促进这一点,每次调用 await() 都会返回该线程在屏障处的到达索引。然后你可以选择哪个线程应该执行屏障动作,例如:

if (barrier.await() == 0) {
  // log the completion of this iteration
} 

一旦各方都调用了 .await() ,有人可以解释如何指定一个特定的线程来执行屏障动作,并可能提供一个例子吗?

【问题讨论】:

  • 我猜你指定为“action”线程的线程将在所有线程完成后立即处理额外的代码,并在barrier.await() 之后开始执行。这是非常危险的,因为您必须确保其他(正在运行的)线程不会接触到“动作”代码正在操作的数据。

标签: java concurrency


【解决方案1】:

CyclicBarrier 可以通过 ORDER 指定线程:

如您所说,如果您将屏障完成逻辑包含在特定于线程索引的条件中,则可以指定以特定顺序返回的线程。因此,您的上述实现将根据您引用的文档进行。

但是,这里的混淆点是文档是根据返回屏障的顺序来讨论线程标识,而不是线程对象标识。因此,线程 0 指的是第 0 个要完成的线程。

替代方案:使用其他机制指定线程。

如果您希望在其他工作完成后让特定线程执行特定操作,您可以使用不同的机制 - 例如 semaphore 。如果你想要这种行为,你可能并不真的需要循环障碍。

要检查文档的含义,请运行下面的类(从 http://programmingexamples.wikidot.com/cyclicbarrier 修改),我在其中合并了您的 sn-p。

CyclicBarrier 文档的含义示例

包线程; 导入 java.util.concurrent.BrokenBarrierException; 导入 java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample
{
    private static int matrix[][] = 
    { 
        { 1 }, 
        { 2, 2 }, 
        { 3, 3, 3 },
        { 4, 4, 4, 4 }, 
        { 5, 5, 5, 5, 5 } };

    static final int rows = matrix.length;
    private static int results[]=new int[rows];


    static int threadId=0;
    private static class Summer extends Thread
    {
        int row;

        CyclicBarrier barrier;

        Summer(CyclicBarrier barrier, int row)
        {
            this.barrier = barrier;
            this.row = row;
        }

        public void run()
        {
            int columns = matrix[row].length;
            int sum = 0;
            for (int i = 0; i < columns; i++)
            {
                sum += matrix[row][i];
            }
            results[row] = sum;
            System.out.println("Results for row " + row + " are : " + sum);
            // wait for the others 
            // Try commenting the below block, and watch what happens. 
            try
            {
                int w = barrier.await();
                if(w==0)
                {
                    System.out.println("merging now !");
                    int fullSum = 0;
                    for (int i = 0; i < rows; i++)
                    {

                        fullSum += results[i];
                    }
                    System.out.println("Results are: " + fullSum);
                }
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])
    {
        /*
         * public CyclicBarrier(int parties,Runnable barrierAction)
         * Creates a new CyclicBarrier that will trip when the given number
         * of parties (threads) are waiting upon it, and which will execute 
         * the merger task when the barrier is tripped, performed 
         * by the last thread entering the barrier.
         */
        CyclicBarrier barrier = new CyclicBarrier(rows );
        for (int i = 0; i < rows; i++)
        {
            System.out.println("Creating summer " + i);
            new Summer(barrier, i).start();

        }
        System.out.println("Waiting...");
    }
}

【讨论】:

  • 如果我在 spring 中有 CyclicBarrierExample 作为 bean 之一,并且 static results[] 与类相关联,并且由于 JVM 中的缓存行为而在多个线程之间共享。这不会是个问题吗?
【解决方案2】:

好的,假设 RuPaul 想要一些工作线程,但只有完成的第三个线程应该执行屏障任务(比如“Sashay,Chante”)。

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class Main
{

   private static class Worker implements Runnable {

      private CyclicBarrier barrier;

      public Worker(CyclicBarrier b) {
         barrier = b;
      }

      public void run() {
         final String threadName = Thread.currentThread().getName();

         System.out.printf("%s:  You better work!%n", threadName);
         // simulate the workin' it part
         Random rnd = new Random();
         int secondsToWorkIt = rnd.nextInt(10) + 1;

         try {
            TimeUnit.SECONDS.sleep(secondsToWorkIt);
         } catch (InterruptedException ex) { /* ...*/ }

         System.out.printf("%s worked it, girl!%n", threadName);

         try {
            int n = barrier.await();
            final int myOrder = barrier.getParties() - n;
            System.out.printf("Turn number: %s was %s%n", myOrder, threadName);

            // MAGIC CODE HERE!!!
            if (myOrder == 3) { // the third one that finished
               System.out.printf("%s: Sashay Chante!%n", myOrder);
            }
            // END MAGIC CODE
         }
         catch (BrokenBarrierException ex) { /* ... */ }
         catch (InterruptedException ex) { /* ... */ }
      }
   }

   private final int numThreads = 5;

   public void work() {
      /*
       * I want the 3rd thread that finished to say "Sashay Chante!"
       * when everyone has called await.
       * So I'm not going to put my "barrier action" in the CyclicBarrier constructor,
       * where only the last thread will run it! I'm going to put it in the Runnable
       * that calls await.
       */
      CyclicBarrier b = new CyclicBarrier(numThreads);

      for (int i= 0; i < numThreads; i++) {
         Worker task = new Worker(b);
         Thread thread = new Thread(task);
         thread.start();
      }
   }

   public static void main(String[] args)
   {
      Main main = new Main();
      main.work();
   }

}

这是一个输出示例:

Thread-0:  You better work!
Thread-4:  You better work!
Thread-2:  You better work!
Thread-1:  You better work!
Thread-3:  You better work!
Thread-1 worked it, girl!
Thread-4 worked it, girl!
Thread-0 worked it, girl!
Thread-3 worked it, girl!
Thread-2 worked it, girl!
Turn number: 5 was Thread-2
Turn number: 3 was Thread-0
3: Sashay Chante!
Turn number: 1 was Thread-1
Turn number: 4 was Thread-3
Turn number: 2 was Thread-4

如您所见,第 3 名完成的线程是 Thread-0,因此 Thread-0 是执行“屏障动作”的线程。

假设你可以命名你的线程:

thread.setName("My Thread " + i);

然后你可以在那个名字的线程上执行操作......我不知道这对你来说有多可行。

【讨论】:

  • 很棒的例子。有人看过我的 Birdcage DVD 吗?
【解决方案3】:

我认为文档的该部分是关于屏障操作Runnable替代,而不是使用它的特定方式。注意它是怎么说的(强调我的):

如果障碍行动在执行时不依赖于被暂停的当事人

如果您将屏障动作指定为可运行,那么它...

每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前

所以,当线程被挂起时(尽管它是由最后一个到达的线程运行的,所以那个线程没有被挂起;但至少它的正常执行流程被挂起,直到屏障操作完成)。

使用await() 的返回值的业务是在线程挂起时不需要执行操作时可以做的事情。

文档中的示例是指示性的。使用Runnable 屏障操作的示例正在协调一些其他线程的工作 - 合并行并检查工作是否完成。其他线程需要等待它知道他们是否还有更多工作要做。因此,它必须在它们被暂停时运行。使用来自await() 的返回值的示例是一些日志记录。其他线程不依赖于已完成的日志记录。因此,它可能会在其他线程开始做更多工作时发生。

【讨论】:

    猜你喜欢
    • 2020-02-23
    • 2014-05-11
    • 2022-01-02
    • 2014-07-30
    • 1970-01-01
    • 2014-08-23
    • 2018-12-20
    • 2014-06-22
    • 1970-01-01
    相关资源
    最近更新 更多