【问题标题】:How to synchronize/lock correctly when using CountDownLatch使用 CountDownLatch 时如何正确同步/锁定
【发布时间】:2010-12-04 09:21:23
【问题描述】:

归结为一个线程通过某个服务提交作业。 Job 在一些 TPExecutor 中执行。之后,此服务在某些条件下(作业超过最大重试次数等)检查结果并在原始线程中抛出异常。下面的代码 sn-p 大致说明了遗留代码中的这种情况:

import java.util.concurrent.CountDownLatch;

public class IncorrectLockingExample {

private static class Request {

    private final CountDownLatch latch = new CountDownLatch(1);

    private Throwable throwable;

    public void await() {
        try {
            latch.await();
        } catch (InterruptedException ignoredForDemoPurposes) {
        }
    }

    public void countDown() {
        latch.countDown();
    }

    public Throwable getThrowable() {
        return throwable;
    }

    public void setThrowable(Throwable throwable) {
        this.throwable = throwable;
    }

}

private static final Request wrapper = new Request();

public static void main(String[] args) throws InterruptedException {

    final Thread blockedThread = new Thread() {
        public void run() {
            wrapper.await();
            synchronized (wrapper) {
                if (wrapper.getThrowable() != null)
                    throw new RuntimeException(wrapper.getThrowable());
            }
        }
    };

    final Thread workingThread = new Thread() {
        public void run() {
            wrapper.setThrowable(new RuntimeException());
            wrapper.countDown();

        }
    };

    blockedThread.start();
    workingThread.start();

    blockedThread.join();
    workingThread.join();
}

}

有时,(在我的机器上无法重现,但在 16 核服务器机器上发生)异常不会被报告给原始线程。我认为这是因为没有强制发生之前发生(例如,'countDown' 发生在 'setThrowable' 之前)并且程序继续工作(但应该失败)。 我将不胜感激有关如何解决此案的任何帮助。 限制条件是:一周内发布,需要对现有代码库的影响最小。

【问题讨论】:

  • 250 KLOC 项目在这里完全多线程,在 16 核等上工作。我们使用“高级”多线程工具,如 CountDownLatch 一个 lot。我们使用 Objectwait() 方法和 Threadjoin() 等低级事物的次数i> 方法? 。在我看来,现在默认 API 中有足够的高级并发设施,您不需要重新发明基于 Java 特质的任何损坏的轮子。 +1 彼得劳里的回答。
  • @Webinator:OP is 使用“高级”CountDownLatch 工具来实现其设计目的之一。
  • 您确定上面的代码没有按预期运行吗?在您进行更正后,我认为没有理由不这样做。
  • 我没有意识到 wait() 是一个坏轮子/

标签: java multithreading locking blocking


【解决方案1】:

上面的代码(现在已更新)应该可以按您的预期工作,而无需使用进一步的同步机制。内存屏障及其对应的 'happens-before' 关系通过使用 CountDownLatch await()countdown() 方法强制执行。

来自API docs

“释放”同步器方法(例如 Lock.unlock、Semaphore.release 和 CountDownLatch.countDown)之前的操作发生在成功“获取”方法(例如 Lock.lock、Semaphore.acquire)之后的操作之前, Condition.await 和 CountDownLatch.await 在另一个线程中的同一个同步器对象上。

如果您经常处理并发问题,请为自己准备一份 'Java Concurrency in Practice',它是 Java 并发圣经,非常值得放在您的书架上 :-)。

【讨论】:

  • CountDownLatch await/countDown 保证发生之前(感谢您指向 API 文档,我不知道这个事实)。然而,happens-before 本身并不能保证在 CountDownLatch.countDown 之前所做的更改的可见性。因此,唯一的解决方案是将 throwable 设置为 volatile,或者在 'synchronized (throwable) {...}' 块中进行更新。这有意义吗
  • @Petro Semeniuk:不,happens-before 关系保证了所有先前写入在内存屏障中的可见性,即在countDown() 之前在workingThread 中所做的任何事情都将在blockedThread 之后可见对应的await() 已返回。仅出于创建内存屏障的目的使用synchronized 被认为是不好的(这就是volatile 的用途)并且无论如何这里不需要volatile,因为使用CountDownLatch 方法的线程同步已经创建了所需的内存障碍和发生之前的关系。
【解决方案2】:

我怀疑你需要

private volatile Throwable throwable

您是否尝试过使用内置的 ExecutorService 并为您执行此操作。以下打印件

future1 := result
future2  threw java.lang.IllegalStateException
future3  timed out

代码是

public static void main(String... args)  {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future1 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            return "result";
        }
    });

    Future<String> future2 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            throw new IllegalStateException();
        }
    });

    Future<String> future3 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep(2000);
            throw new AssertionError();
        }
    });

    printResult("future1", future1);
    printResult("future2", future2);
    printResult("future3", future3);
    executor.shutdown();
}

private static void printResult(String description, Future<String> future) {
    try {
        System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS));
    } catch (InterruptedException e) {
        System.out.println(description+"  interrupted");
    } catch (ExecutionException e) {
        System.out.println(description+"  threw "+e.getCause());
    } catch (TimeoutException e) {
        System.out.println(description+"  timed out");
    }
}

在 FutureTask 的代码中,有一个注释。

/**
 * The thread running task. When nulled after set/cancel, this
 * indicates that the results are accessible.  Must be
 * volatile, to ensure visibility upon completion.
 */

如果您不打算重用 JDK 中的代码,它仍然值得一读,以便您了解他们使用的任何技巧。

【讨论】:

  • +1 表示ExecutorService 建议,但volatile 关键字在这里没有任何用处。
  • 感谢您提供 FutureTask 的示例。看起来我们将来必须升级到的(而不是使用 CountDownLatch 和 throwable 的组合)。不幸的是,在保证发生之前,我的情况略有不同。如果 FutureTask 'volatile' 需要运行器,因为可调用(innerRunAndReset 方法)不能保证发生之前。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-23
  • 2016-07-08
  • 2021-02-04
  • 2011-11-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多