【问题标题】:Atomic compareAndSet but with callback?原子 compareAndSet 但有回调?
【发布时间】:2015-10-21 16:43:30
【问题描述】:

我知道AtomicReferencecompareAndSet,但我觉得我想做的就是这个

private final AtomicReference<Boolean> initialized = new AtomicReference<>( false );
...

atomicRef.compareSetAndDo( false, true, () -> {
  // stuff that only happens if false
});

这可能也可以,可能会更好。

atomicRef.compareAndSet( false, () -> {
  // stuff that only happens if false
  // if I die still false.

   return true;
});

我注意到有一些新的功能构造,但我不确定它们是否是我正在寻找的。​​p>

任何新结构都可以做到这一点吗?如果有,请举个例子。

更新 为了简化我的问题,我试图找到一种不易出错的方法,以“为对象执行一次”或(真的)惰性初始化方式来保护代码,我知道我团队中的一些开发人员发现 compareAndSet令人困惑。

【问题讨论】:

  • 这里的falsetrue 是什么?您是否将Boolean 存储在AtomicReference 中?如果是这样,您应该改用AtomicBoolean
  • @Misha 更新问题
  • @xenoterracide 你或我错过了一些东西。延迟初始化必然需要Atomic API 没有的锁定机制。为什么 ?仅仅因为系统调用者需要初始化系统,并且应该在初始化完成后立即阻止调用者或其他线程所做的一切。您是否应该依赖一个响应式/承诺 API,例如 CompetableFuture
  • @LoganMzz 也许,我愿意接受答案形式的代码示例。我真的几乎没有看过未来的东西,而且例子似乎比网络上的其他东西少,我发现的少数几个建议线程池,这对于这个来说是多余的。
  • 我还是一头雾水。你需要多线程支持吗? Atomic API(和所有并发工具)仅适用于多个线程竞争相同资源的情况。否则,在初始化过程中其他线程应该做什么?

标签: java java-8


【解决方案1】:

“为对象执行一次”中的保护代码

具体如何实现这取决于您希望其他线程同时尝试执行相同的事情。如果你只是让它们跑过 CAS,它们可能会在一个成功的线程执行其操作时观察处于中间状态的事物。

或者(真的)惰性初始化方式

如果您将它用于惰性初始化器,则该构造不是线程安全的,因为“已初始化”布尔值可能由一个线程设置为 true,然后在另一个线程观察真实状态但读取空结果。

如果可以接受多个并发/重复初始化尝试,其中一个对象最终获胜而其他对象被 GC 丢弃,则可以使用Atomicreference::updateAndGet。更新方法应该没有副作用。

否则,您应该只使用带有变量引用字段的双重检查锁定模式。

当然,您始终可以将其中任何一个打包到返回 RunnableSupplier 的高阶函数中,然后将其分配给最终字段。

// ==  FunctionalUtils.java

/** @param mayRunMultipleTimes must be side-effect-free */
public static <T> Supplier<T> instantiateOne(Supplier<T> mayRunMultipleTimes) {
  AtomicReference<T> ref = new AtomicReference<>(null);

  return () -> {
    T val = ref.get(); // fast-path if already initialized
    if(val != null)
      return val;
    return ref.updateAndGet(v -> v == null ? mayRunMultipleTimes.get() : v)
  };

}


// == ClassWithLazyField.java

private final Supplier<Foo> lazyInstanceVal = FunctionalUtils.instantiateOne(() -> new Foo());

public Foo getFoo() {
  lazyInstanceVal.get();
}

您可以通过这种方式轻松封装各种自定义控制流和锁定模式。 Here are two of my own..

【讨论】:

    【解决方案2】:

    compareAndSet 如果更新完成则返回 true,如果实际值不等于预期值则返回 false。

    所以只需使用

    if (ref.compareAndSet(expectedValue, newValue)) { 
        ... 
    }
    

    也就是说,我不太了解您的示例,因为您将 true 和 false 传递给将对象引用作为参数的方法。您的第二个示例与第一个示例不同。如果第二个是你想要的,我想你所追求的是

    ref.getAndUpdate(value -> {
        if (value.equals(expectedValue)) {
            return someNewValue(value);
        }
        else {
            return value;
        }
    });
    

    【讨论】:

    • 我更新了我的问题以使其更清楚(我认为),但不确定它是否会改变您的答案。
    • 鉴于您的附录,您不应使用 AtomicReference,而应使用 AtomicBoolean,并使用 if (atomicBoolean.compareAndSet(false, true)) { initializeOnce() }
    【解决方案3】:

    你把事情复杂化了。仅仅因为现在有了 lambda 表达式,你就不需要用 lambdas 解决所有问题了:

    private volatile boolean initialized;
    …
    if(!initialized) synchronized(this) {
        if(!initialized) {
            // stuff to be done exactly once
            initialized=true;
        }
    }
    

    双重检查锁定可能没有很好的声誉,但对于非static属性,几乎没有替代品。

    如果您考虑多个线程在未初始化状态下同时访问它,并希望保证该操作只运行一次,并且在执行相关代码之前它已经完成,Atomic… 对象帮不了你。

    只有一个线程可以成功执行compareAndSet(false,true),但由于失败意味着该标志已经具有新值,即已初始化,所有其他线程将继续执行,就好像“要执行的东西”一样只完成一次”已经完成,而它可能仍在运行。另一种方法是先读取标志并有条件地执行这些东西,然后再执行compareAndSet,但这允许多个并发执行“东西”。这也是 updateAndGetaccumulateAndGet 所发生的情况,它提供了函数。

    为了保证在继续之前准确执行一次,如果“东西”当前正在执行,线程必须被阻塞。上面的代码就是这样做的。请注意,一旦“东西”完成,将不再有锁定,volatile 读取的性能特征与Atomic… 读取相同。

    编程更简单的唯一解决方案是使用ConcurrentMap

    private final ConcurrentHashMap<String,Boolean> initialized=new ConcurrentHashMap<>();
    …
    initialized.computeIfAbsent("dummy", ignore -> {
        // stuff to do exactly once
        return true;
    });
    

    它可能看起来有点过大,但它提供了所需的性能特征。它将使用synchronized(或者说是依赖于实现的排除机制)保护初始计算,但在后续查询中使用volatile 语义执行单次读取。

    如果您想要一个更轻量级的解决方案,您可以继续使用此答案开头显示的双重检查锁定......

    【讨论】:

    • 只是对我正在查看 lambda 的原因发表评论,这是因为我(和其他人)忘记在最后设置值,一直在努力寻找更好的方法来不忘记做事,API 让做错事变得太容易了。而要求您返回值的 lambda 让您很难忘记这样做。
    • 那你应该把代码分开。看,第一个例子只包含六行——它很容易维护。只需将“stuff”注释替换为单个方法调用,代码就会保持可维护性。在方法中,代码可以随心所欲地复杂化,但你不能忘记赋值,因为方法不负责它。这与 lambda 的原理相同,尤其是与第二个示例一样,因为实际返回值无关紧要。这只是关于划分职责,而使用方法或 lambda 并不重要。
    • 这更像是一种我们在多个对象中不断重复的模式。所以说你不能忘记,因为它不负责任......好吧,不,我只是再次实施它并再次忘记。条件里面的东西是 1,或者在某些情况下是 2 或 3 个封装的方法(后者是遗留的)。
    【解决方案4】:

    我知道这是旧的,但我发现没有完美的方法来实现这一点,更具体地说是:


    试图找到一种不易出错的方法来保护“一次执行(任何)...”中的代码


    我将补充说“同时尊重先发生的行为”。这是在您的情况下实例化单例所必需的。

    IMO 实现这一目标的最佳方法是通过同步函数:

    public<T> T transaction(Function<NonSyncObject, T> transaction) {
        synchronized (lock) {
            return transaction.apply(nonSyncObject);
        }
    }
    

    这允许在给定对象上执行原子“事务”。

    其他选项是双重检查自旋锁:

    for (;;) {
        T t = atomicT.get();
        T newT = new T();
        if (atomicT.compareAndSet(t, newT)) return;
    }
    

    在这个new T();会被重复执行,直到值设置成功,所以它并不是真正的“一次做某事”。

    这仅适用于写入事务的复制,并且可以通过调整代码来帮助“实例化对象一次”(实际上是实例化了许多对象,但最终引用了相同的对象)。

    最后一个选项是第一个选项的性能最差的版本,但这个选项确实发生在 AND ONCE 之前(与双重检查自旋锁相反):

    public void doSomething(Runnable r) {
        while (!atomicBoolean.compareAndSet(false, true)) {}
        // Do some heavy stuff ONCE
        r.run();
        atomicBoolean.set(false);
    }
    

    第一个是更好的选择的原因是它正在做这个做的事情,但是以更优化的方式。

    附带说明一下,在我的项目中,我实际上使用了下面的代码(类似于@the8472 的答案),当时我认为是安全的,它可能是:

    public T get() {
        T res = ref.get();
        if (res == null) {
            res = builder.get();
            if (ref.compareAndSet(null, res))
                return res;
            else
                return ref.get();
        } else {
            return res;
        }
    }
    

    关于这段代码的事情是,作为写时复制循环,这个生成多个实例,每个竞争线程一个,但只有一个被缓存,第一个,所有其他构造最终都会被 GC'd。

    看看 putIfAbsent 方法,我发现它的好处是跳过了 17 行代码,然后是一个同步体:

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
    

    然后同步体本身就是另外34行:

                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
    

    使用 ConcurrentHashMap 的优点是它无疑可以工作。

    【讨论】:

      猜你喜欢
      • 2013-10-14
      • 2020-06-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-19
      • 1970-01-01
      • 2014-06-28
      • 2016-07-25
      相关资源
      最近更新 更多