【问题标题】:Replace synchronized with atomic+while loop in case of low lock contention在低锁竞争的情况下,用 atomic+while 循环替换同步
【发布时间】:2018-11-07 19:20:46
【问题描述】:

我有两个必须在临界区运行的函数:

public synchronized void f1() { ... }
public synchronized void f2() { ... }

假设行为如下:

  • f1 几乎从不被调用。实际上,在正常情况下,这个方法永远不会被调用。如果无论如何调用f1,它应该很快返回。
  • f2 的调用率非常高。它很快就会返回。
  • 这些方法从不相互调用,也没有重入。

换句话说,竞争非常低。所以当f2 被调用时,我们有一些开销来获取锁,在 99.9% 的情况下会立即授予。我想知道是否有办法避免这种开销。

我想出了以下替代方案:

private final AtomicInteger lock = new AtomicInteger(0);

public void f1() {
    while (!lock.compareAndSet(0, 1)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

public void f2() {
    while (!lock.compareAndSet(0, 2)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

还有其他方法吗? java.util.concurrent 包本身是否提供了一些东西?

更新

虽然我的意图是提出一个笼统的问题,但关于我的情况的一些信息:

f1:如果由于某种原因当前的远程流损坏,例如由于超时,此方法会创建一个新的远程流。远程流可以被认为是一个套接字连接,它消耗从给定位置开始的远程队列:

private Stream stream;

public synchronized void f1() {
     final Stream stream = new Stream(...);

     if (this.stream != null) {
         stream.setPosition(this.stream.getPosition());
     }
     this.stream = stream;
     return stream;
}

f2:此方法推进流位置。这是一个简单的设置器:

public synchronized void f2(Long p) {
    stream.setPosition(p);
}

这里,stream.setPosition(Long) 也被实现为一个普通的 setter:

public class Stream {
    private volatile Long position = 0;

    public void setPosition(Long position) {
        this.position = position;
    }
}

Stream中,当前位置会定期异步发送到服务器。注意Stream不是我自己实现的。

我的想法是引入如上图所示的比较和交换,并将stream标记为volatile

【问题讨论】:

  • 为什么要同步?什么资源被同时访问?这是你应该关注的
  • @JeanLogeart 我遵循您的思考过程,但我想从问题中省略这一点。也就是说,认为这是一个通用/教育问题。
  • 那些while 循环变成繁忙循环,直到锁定值被更新。分析是否显示瓶颈?
  • @AndrewS 在这个特定场景中,f2 中的while 循环几乎总是if。同样,这是一个通用问题。
  • 如果要互斥,synchonized是更好更自然的选择。您最好阻塞线程并且不消耗任何 CPU,而不是燃烧 CPU 周期来检查您是否不再被阻塞。这些循环可用于其他线程的计算。

标签: java concurrency java.util.concurrent


【解决方案1】:

你的例子没有做你想做的事。当锁 is 被使用时,您实际上正在执行您的代码。试试这样的:

public void f1() {
    while (!lock.compareAndSet(0, 1)) {
    }

    try {
        ...
    } finally {
        lock.set(0);
    }
}

为了回答你的问题,我认为这不会比使用synchronized 方法更快,而且这种方法更难阅读和理解。

【讨论】:

  • 糟糕,谢谢。我改变了我的问题以反映这一点。
  • @pbillen 好吧,明白了。您是否使用更新版本进行了测试?我不希望它比简单的synchronized 方法快得多(如果有的话)。
  • “我认为这不会比使用同步方法更快” 不正确。使用 synchronized 比 atomic int 版本慢大约 50%,至少在我的机器上是这样
  • 以百分比表示,这要快得多,但如果锁定部分不是微不足道的,您将不会注意到差异。如果它是微不足道的,你最好将操作设为原子而不是像这样包装。
  • @JeffBrower 我相信这取决于调用方法的速率和计算复杂度。如前所述,f2 的调用率非常高,并且返回速度非常快。
【解决方案2】:

根据描述和您的示例代码,我推断出以下内容:

  1. Stream 有自己的内部位置,并且您还在外部跟踪最新的position。您将其用作一种“恢复点”:当您需要重新初始化流时,将其推进到该点。

  2. 最后一个已知的position 可能已过时;我假设这是基于您的断言,即流会定期异步通知服务器其当前位置。

  3. 在调用f1 时,已知流处于错误状态。

  4. 函数f1f2 访问相同的数据,并且可以同时运行。但是,f1f2 都不会同时针对自身运行。换句话说,您几乎拥有一个单线程程序,除了f1f2 都在执行的极少数情况。

    [旁注:我的解决方案实际上并不关心f1 是否与其自身同时被调用;它只关心f2 不会与其自身同时调用]

如果其中任何一个是错误的,那么下面的解决方案就是错误的。哎呀,无论如何它可能是错误的,要么是因为遗漏了一些细节,要么是因为我犯了一个错误。编写低锁定代码很困难,这正是您应该避免使用它的原因,除非您观察到实际的性能问题

static class Stream {
    private long position = 0L;

    void setPosition(long position) {
        this.position = position;
    }
}

final static class StreamInfo {
    final Stream stream = new Stream();
    volatile long resumePosition = -1;

    final void setPosition(final long position) {
        stream.setPosition(position);
        resumePosition = position;
    }
}

private final Object updateLock = new Object();
private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());

void f1() {
    synchronized (updateLock) {
        final StreamInfo oldInfo = currentInfo.getAndSet(null);
        final StreamInfo newInfo = new StreamInfo();

        if (oldInfo != null && oldInfo.resumePosition > 0L) {
            newInfo.setPosition(oldInfo.resumePosition);
        }

        // Only `f2` can modify `currentInfo`, so update it last.
        currentInfo.set(newInfo);

        // The `f2` thread might be waiting for us, so wake them up.
        updateLock.notifyAll();
    }
}

void f2(final long newPosition) {
    while (true) {
        final StreamInfo s = acquireStream();

        s.setPosition(newPosition);
        s.resumePosition = newPosition;

        // Make sure the stream wasn't replaced while we worked.
        // If it was, run again with the new stream.
        if (acquireStream() == s) {
            break;
        }
    }
}

private StreamInfo acquireStream() {
    // Optimistic concurrency: hope we get a stream that's ready to go.
    // If we fail, branch off into a slower code path that waits for it.
    final StreamInfo s = currentInfo.get();
    return s != null ? s : acquireStreamSlow();
}

private StreamInfo acquireStreamSlow() {
    synchronized (updateLock) {
        while (true) {
            final StreamInfo s = currentInfo.get();

            if (s != null) {
                return s;
            }

            try {
                updateLock.wait();
            }
            catch (final InterruptedException ignored) {
            }
        }
    }
}

如果流发生故障并被f1 替换,则之前对f2 的调用可能仍在(现已失效的)流上执行某些操作。我假设这没问题,并且它不会引入不良的副作用(除了那些已经存在于基于锁的版本中的副作用)。我做出这个假设是因为我们已经在上面的列表中确定了您的恢复点可能已过时,并且我们还确定了 f1 仅在已知流处于错误状态时才被调用。

根据我的 JMH 基准,这种方法比 CAS 或同步版本(它们本身非常接近)快大约 3 倍。

【讨论】:

  • 感谢您的详细解答,非常感谢。有一条“快”和“慢”路径来获取流的想法是一个有趣的想法。不过,我不确定上面的代码是否正确。让我解释。假设线程t1 正在执行f1 并且即将执行currentInfo = newInfo;。现在,线程t2 执行f2(position)。假设acquireStream() 可以快速返回当前流。现在调用s.setPosition(newPosition);s.resumePosition = newPosition; 将永远丢失,因为t1f1 中不再看到它。还是我错过了什么?
  • 你是对的,并且代码假设 f1 仅在已知流发生故障时才被调用,因此 f2 完成的任何并发工作都“丢失”(在某种意义上由于“旧”流已损坏,它将无法完成)。
  • 是的,我确实考虑了这个推断的假设。虽然我不确定我是否理解你的假设以及它是否实用。通常,f1 仅在流/套接字中断时调用,例如由于断开连接。除非你有办法“暂停”其他线程(可能调用f2),否则f1f2 之间总是有一个临界区?因此,我不太确定“代码假定 f1 仅在已知流发生故障时才被调用”是什么意思。
  • 我的意思是,如果f1 仅在流处于不良状态时被调用,那么f2 与旧流同时运行又有什么关系呢?当f2 尝试修改或使用(坏)流时,它不会失败吗?如果这是一个有缺陷的假设,那么正如你所说,我的解决方案是错误的。
  • 主要关注的是位置继承自旧流。正如您在代码 sn-p 中看到的,Stream.setPosition(pos) 实际上是一个简单的 setter。是不做任何网络IO。因此,每当创建新流时,我想确保它始终从旧(坏)流继承位置。这就是为什么我认为f1f2 之间存在一个无法避免的关键部分。
【解决方案3】:

另一种方法是使用时间戳锁,其作用类似于修改计数。如果您具有较高的读写比率,这将非常有效。

另一种方法是拥有一个通过 AtomicReference 存储状态的不可变对象。如果您有非常高的读写比率,这很有效。

【讨论】:

  • @pbillen 了解f1f2 之间的行为差​​异会很有用。例如,如果其中只有一个执行修改,那么这是有用的信息,因为这是我们可以优化的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-07-16
  • 2013-10-16
  • 2012-03-31
  • 2018-01-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多