【问题标题】:Least value concurrent map最小值并发映射
【发布时间】:2026-01-12 10:30:01
【问题描述】:

我需要一个地图的实现, 支持并发,并且只存储最少/最多的附加值(取决于比较器)。 下面的代码可以工作吗?

 class LeastValConcurrentMap<K, V> {

  //put the least value
  private final Comparator<V> comparator;
  private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<K, V>();

  LeastValConcurrentMap(Comparator comparator) {
     this.comparator = comparator;
  }

  public void put(K k, V v)  {
     V vOld = map.put(k, v);
     if (vOld == null || comparator.compare(v, vOld) <= 0) //i.e. v <= vOld so better
        return;
     //recursively call self
     put(k, vOld);
  }

  @Override
  public String toString() {
     return map.toString();
  }
}

您能否举例说明它在哪里/为什么不起作用? 番石榴或标准 java 库中有什么我可以使用的吗?

【问题讨论】:

  • 我相信,你想要 V vOld = map.get(k, v);
  • 你需要同步 LeastValConcurrentMap.put(K k, V v) 方法,因为 ConcurrentHashMap 可能是线程安全的,但你的方法不是。
  • 你有一个竞争条件。考虑当两个线程同时放置相同的键时会发生什么。即他们都在运行 put()
  • 这就是递归方法出现的地方,再次调用自身并检查 v 是否仍然是最小值
  • 假设 key:val , concurrentMap 包含 k1:3, thread1 puts val k1:2, vOld = 3 返回, thread2 puts k1:1, (put 是原子的, 在我的理解中), vOld = 2返回...

标签: java guava java.util.concurrent concurrenthashmap


【解决方案1】:

我觉得比较复杂,需要用atomicConcurrentHashMap.replace(K key, V oldValue, V newValue)

public void put(K k, V v) {
    V oldValue = map.putIfAbsent(k, v);
    if (oldValue == null) {
        // this is the first mapping to this key 
        return;
    }
    for (;;) {
        if (comparator.compare(v, oldValue) <= 0) {
            break;
        }
        // this replace returns true only if oldValue was replaced with new value atomically   
        if (map.replace(k, oldValue, v)) {
            break;
        }
        // otherwise another attempt
        oldValue = map.get(k);
    }

【讨论】:

  • 看起来不错,我不确定您的答案中的比较器行,如果它不是比较器.compare(oldValue, v)
  • 但是我只是从你的代码中拿了一个:),当然,根据所需的逻辑进行更改
  • 不知道你有没有注意到我代码中的递归?你能给我一个我的不工作的例子吗
  • 这里 - put(k, vOld);当你把它放在另一个线程干扰并且已经插入了一个较小的值时。
  • 请注意,逻辑似乎很复杂。但这是使其无锁的唯一方法
【解决方案2】:

您可能需要同步 LeastValConcurrentMap 类的 put 方法。看来您只是将值放入地图中。这些值将在哪里/如何使用。为了确保并发访问,您需要考虑读/写操作。你的 put 方法的最后一行也应该像 map.put(k, vOld)

【讨论】:

  • 请注意我的方法是递归的
  • 你能解释一下为什么它需要递归
  • 它需要再次检查,以防另一个线程将较低的值放入其中
  • 这就是为什么你需要同步你的 put 方法。这样,直到一个线程离开该方法,另一个线程才会进入该方法。当您进行递归调用并尝试将较低的值放回地图时,地图有可能在您再次放回之前被其他人读取......在这种情况下,它会有错误的值
  • 如果您必须将synchronized 构造与Concurrent* 容器结合使用,那么您可能做错了什么。